You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2014/02/07 17:57:52 UTC
svn commit: r1565726 [1/6] - in /qpid/trunk/qpid/java: ./
amqp-1-0-client-jms/ amqp-1-0-client/ amqp-1-0-common/
bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/
bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrad...
Author: rgodfrey
Date: Fri Feb 7 16:57:49 2014
New Revision: 1565726
URL: http://svn.apache.org/r1565726
Log:
QPID-5504 : Refactoring to allow for nodes other than queues to be subscribed from, and nodes other than exchanges to be sent to (merged from separate branch)
Added:
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/
- copied from r1565710, qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/consumer/
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageDestination.java
- copied unchanged from r1565710, qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageDestination.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java
- copied unchanged from r1565710, qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageNode.java
- copied unchanged from r1565710, qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageNode.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageSource.java
- copied unchanged from r1565710, qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageSource.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/SystemNodeCreator.java
- copied unchanged from r1565710, qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/plugin/SystemNodeCreator.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/CapacityChecker.java
- copied unchanged from r1565710, qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/protocol/CapacityChecker.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AssignedConsumerMessageGroupManager.java
- copied unchanged from r1565710, qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/AssignedConsumerMessageGroupManager.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/DefinedGroupMessageGroupManager.java
- copied unchanged from r1565710, qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/DefinedGroupMessageGroupManager.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/MessageGroupManager.java
- copied unchanged from r1565710, qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/MessageGroupManager.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java
- copied unchanged from r1565710, qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerList.java
- copied unchanged from r1565710, qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerList.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/util/Action.java
- copied unchanged from r1565710, qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/util/Action.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/util/StateChangeListener.java
- copied unchanged from r1565710, qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/util/StateChangeListener.java
qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/
- copied from r1565710, qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/consumer/
qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/logging/messages/ConsumerMessagesTest.java
- copied unchanged from r1565710, qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/logging/messages/ConsumerMessagesTest.java
qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/ConsumerListTest.java
- copied unchanged from r1565710, qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/queue/ConsumerListTest.java
qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
- copied unchanged from r1565710, qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ClientDeliveryMethod.java
- copied unchanged from r1565710, qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ClientDeliveryMethod.java
qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
- copied unchanged from r1565710, qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/RecordDeliveryMethod.java
- copied unchanged from r1565710, qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/RecordDeliveryMethod.java
qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
- copied unchanged from r1565710, qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageSourceDestination.java
- copied unchanged from r1565710, qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageSourceDestination.java
qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java
- copied unchanged from r1565710, qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/ConsumerLoggingTest.java
- copied unchanged from r1565710, qpid/branches/java-broker-amqp-1-0-management/java/systests/src/main/java/org/apache/qpid/server/logging/ConsumerLoggingTest.java
Removed:
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/actors/SubscriptionActor.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/subjects/SubscriptionLogSubject.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/subscription/
qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/logging/actors/SubscriptionActorTest.java
qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/logging/messages/SubscriptionMessagesTest.java
qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/logging/subjects/SubscriptionLogSubjectTest.java
qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/subscription/
qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Subscription_0_10.java
qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionFactory.java
qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionFactoryImpl.java
qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionImpl.java
qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/SubscriptionFactoryImplTest.java
qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/SubscriptionLoggingTest.java
Modified:
qpid/trunk/qpid/java/ (props changed)
qpid/trunk/qpid/java/amqp-1-0-client/ (props changed)
qpid/trunk/qpid/java/amqp-1-0-client-jms/ (props changed)
qpid/trunk/qpid/java/amqp-1-0-common/ (props changed)
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/MessageMetaDataBinding.java
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/PreparedTransactionBinding.java
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom4To5.java
qpid/trunk/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java
qpid/trunk/qpid/java/broker-core/ (props changed)
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/Exchange.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/actors/CurrentActor.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/actors/GenericActor.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Session.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConsumerAdapter.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/ (props changed)
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/BaseQueue.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/OutOfOrderQueue.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueContext.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueRunner.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueue.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java (contents, props changed)
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationStoreHelper.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StorableMessageMetaData.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/Transaction.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/TransactionLogResource.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/DistributedTransaction.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/DtxBranch.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/ (props changed)
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java
qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java
qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/logging/messages/VirtualHostMessagesTest.java
qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java
qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java
qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java
qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java
qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreQuotaEventsTestBase.java
qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaData.java
qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/AutoCommitTransactionTest.java
qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/LocalTransactionTest.java
qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java
qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java
qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ImplicitAcceptDispositionChangeListener.java
qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java
qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaData_0_10.java
qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolSession.java
qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeue.java
qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/IncomingMessage.java
qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java
qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMap.java
qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapImpl.java
qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java
qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java
qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicPublishMethodHandler.java
qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRecoverMethodHandler.java
qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRecoverSyncMethodHandler.java
qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRejectMethodHandler.java
qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeclareHandler.java
qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/TxRollbackHandler.java
qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java
qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AcknowledgeTest.java
qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeueTest.java
qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java
qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/QueueBrowserUsesNoAckTest.java
qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ (props changed)
qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java
qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java (contents, props changed)
qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java (props changed)
qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/QueueDestination.java
qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
qpid/trunk/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/MessageServlet.java
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/ConnectionRestTest.java
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/RollbackOrderTest.java
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/MaxDeliveryCountTest.java
qpid/trunk/qpid/java/test-profiles/ (props changed)
qpid/trunk/qpid/java/test-profiles/Java010Excludes
Propchange: qpid/trunk/qpid/java/
------------------------------------------------------------------------------
Merged /qpid/branches/java-broker-amqp-1-0-management/java:r1562456-1565710
Propchange: qpid/trunk/qpid/java/amqp-1-0-client/
------------------------------------------------------------------------------
Merged /qpid/branches/java-broker-amqp-1-0-management/java/amqp-1-0-client:r1562456-1565710
Propchange: qpid/trunk/qpid/java/amqp-1-0-client-jms/
------------------------------------------------------------------------------
Merged /qpid/branches/java-broker-amqp-1-0-management/java/amqp-1-0-client-jms:r1562456-1565710
Propchange: qpid/trunk/qpid/java/amqp-1-0-common/
------------------------------------------------------------------------------
Merged /qpid/branches/java-broker-amqp-1-0-management/java/amqp-1-0-common:r1562456-1565710
Modified: qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/MessageMetaDataBinding.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/MessageMetaDataBinding.java?rev=1565726&r1=1565725&r2=1565726&view=diff
==============================================================================
--- qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/MessageMetaDataBinding.java (original)
+++ qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/MessageMetaDataBinding.java Fri Feb 7 16:57:49 2014
@@ -69,7 +69,7 @@ public class MessageMetaDataBinding exte
buf.position(1);
buf = buf.slice();
- metaData.writeToBuffer(0, buf);
+ metaData.writeToBuffer(buf);
tupleOutput.writeInt(bodySize);
tupleOutput.writeFast(underlying);
}
Modified: qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/PreparedTransactionBinding.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/PreparedTransactionBinding.java?rev=1565726&r1=1565725&r2=1565726&view=diff
==============================================================================
--- qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/PreparedTransactionBinding.java (original)
+++ qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/PreparedTransactionBinding.java Fri Feb 7 16:57:49 2014
@@ -73,7 +73,7 @@ public class PreparedTransactionBinding
output.writeInt(records.length);
for(Transaction.Record record : records)
{
- UUID id = record.getQueue().getId();
+ UUID id = record.getResource().getId();
output.writeLong(id.getMostSignificantBits());
output.writeLong(id.getLeastSignificantBits());
output.writeLong(record.getMessage().getMessageNumber());
@@ -93,7 +93,7 @@ public class PreparedTransactionBinding
_queueId = queueId;
}
- public TransactionLogResource getQueue()
+ public TransactionLogResource getResource()
{
return this;
}
@@ -119,9 +119,21 @@ public class PreparedTransactionBinding
}
@Override
+ public String getName()
+ {
+ return _queueId.toString();
+ }
+
+ @Override
public UUID getId()
{
return _queueId;
}
+
+ @Override
+ public boolean isDurable()
+ {
+ return true;
+ }
}
}
Modified: qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom4To5.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom4To5.java?rev=1565726&r1=1565725&r2=1565726&view=diff
==============================================================================
--- qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom4To5.java (original)
+++ qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom4To5.java Fri Feb 7 16:57:49 2014
@@ -741,7 +741,7 @@ public class UpgradeFrom4To5 extends Abs
buf.position(1);
buf = buf.slice();
- metaData.writeToBuffer(0, buf);
+ metaData.writeToBuffer(buf);
output.writeInt(bodySize);
output.writeFast(underlying);
}
Modified: qpid/trunk/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java?rev=1565726&r1=1565725&r2=1565726&view=diff
==============================================================================
--- qpid/trunk/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java (original)
+++ qpid/trunk/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java Fri Feb 7 16:57:49 2014
@@ -416,10 +416,22 @@ public class BDBMessageStoreTest extends
TransactionLogResource mockQueue = new TransactionLogResource()
{
@Override
+ public String getName()
+ {
+ return getId().toString();
+ }
+
+ @Override
public UUID getId()
{
return mockQueueId;
}
+
+ @Override
+ public boolean isDurable()
+ {
+ return true;
+ }
};
Transaction txn = log.newTransaction();
@@ -454,10 +466,22 @@ public class BDBMessageStoreTest extends
TransactionLogResource mockQueue = new TransactionLogResource()
{
@Override
+ public String getName()
+ {
+ return getId().toString();
+ }
+
+ @Override
public UUID getId()
{
return mockQueueId;
}
+
+ @Override
+ public boolean isDurable()
+ {
+ return true;
+ }
};
Transaction txn = log.newTransaction();
@@ -511,10 +535,22 @@ public class BDBMessageStoreTest extends
TransactionLogResource mockQueue = new TransactionLogResource()
{
@Override
+ public String getName()
+ {
+ return getId().toString();
+ }
+
+ @Override
public UUID getId()
{
return mockQueueId;
}
+
+ @Override
+ public boolean isDurable()
+ {
+ return true;
+ }
};
Transaction txn = log.newTransaction();
Propchange: qpid/trunk/qpid/java/broker-core/
------------------------------------------------------------------------------
Merged /qpid/branches/java-broker-amqp-1-0-management/java/broker-core:r1562456-1565710
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java?rev=1565726&r1=1565725&r2=1565726&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java Fri Feb 7 16:57:49 2014
@@ -26,6 +26,7 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.AMQInternalException;
import org.apache.qpid.AMQSecurityException;
import org.apache.qpid.server.binding.Binding;
+import org.apache.qpid.server.consumer.Consumer;
import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.logging.messages.BindingMessages;
@@ -33,14 +34,17 @@ import org.apache.qpid.server.logging.me
import org.apache.qpid.server.logging.subjects.BindingLogSubject;
import org.apache.qpid.server.logging.subjects.ExchangeLogSubject;
import org.apache.qpid.server.message.InstanceProperties;
+import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.UUIDGenerator;
import org.apache.qpid.server.plugin.ExchangeType;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.BaseQueue;
+import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.store.DurableConfigurationStoreHelper;
import org.apache.qpid.server.txn.ServerTransaction;
+import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.virtualhost.VirtualHost;
import java.util.Collection;
@@ -66,7 +70,7 @@ public abstract class AbstractExchange i
private VirtualHost _virtualHost;
- private final List<Task> _closeTaskList = new CopyOnWriteArrayList<Task>();
+ private final List<Action<Exchange>> _closeTaskList = new CopyOnWriteArrayList<Action<Exchange>>();
/**
* Whether the exchange is automatically deleted once all queues have detached from it
@@ -138,6 +142,12 @@ public abstract class AbstractExchange i
if(_closed.compareAndSet(false,true))
{
+ List<Binding> bindings = new ArrayList<Binding>(_bindings);
+ for(Binding binding : bindings)
+ {
+ removeBinding(binding);
+ }
+
if(_alternateExchange != null)
{
_alternateExchange.removeReference(this);
@@ -145,9 +155,9 @@ public abstract class AbstractExchange i
CurrentActor.get().message(_logSubject, ExchangeMessages.DELETED());
- for(Task task : _closeTaskList)
+ for(Action<Exchange> task : _closeTaskList)
{
- task.onClose(this);
+ task.performAction(this);
}
_closeTaskList.clear();
}
@@ -300,12 +310,12 @@ public abstract class AbstractExchange i
return !_referrers.isEmpty();
}
- public void addCloseTask(final Task task)
+ public void addCloseTask(final Action<Exchange> task)
{
_closeTaskList.add(task);
}
- public void removeCloseTask(final Task task)
+ public void removeCloseTask(final Action<Exchange> task)
{
_closeTaskList.remove(task);
}
@@ -421,7 +431,7 @@ public abstract class AbstractExchange i
public final int send(final ServerMessage message,
final InstanceProperties instanceProperties,
final ServerTransaction txn,
- final BaseQueue.PostEnqueueAction postEnqueueAction)
+ final Action<MessageInstance<? extends Consumer>> postEnqueueAction)
{
List<? extends BaseQueue> queues = route(message, instanceProperties);
@@ -579,8 +589,6 @@ public abstract class AbstractExchange i
{
doRemoveBinding(b);
queue.removeBinding(b);
- removeCloseTask(b);
- queue.removeQueueDeleteTask(b);
if (b.isDurable())
{
@@ -659,8 +667,6 @@ public abstract class AbstractExchange i
DurableConfigurationStoreHelper.createBinding(_virtualHost.getDurableConfigurationStore(), b);
}
- queue.addQueueDeleteTask(b);
- addCloseTask(b);
queue.addBinding(b);
doAddBinding(b);
b.logCreation();
@@ -673,7 +679,7 @@ public abstract class AbstractExchange i
}
}
- private final class BindingImpl extends Binding implements AMQQueue.Task, Task
+ private final class BindingImpl extends Binding
{
private final BindingLogSubject _logSubject;
//TODO : persist creation time
@@ -689,12 +695,6 @@ public abstract class AbstractExchange i
}
-
- public void doTask(final AMQQueue queue) throws AMQException
- {
- removeBinding(this);
- }
-
public void onClose(final Exchange exchange) throws AMQSecurityException, AMQInternalException
{
removeBinding(this);
@@ -729,11 +729,6 @@ public abstract class AbstractExchange i
}
- public static interface Task
- {
- public void onClose(Exchange exchange) throws AMQSecurityException, AMQInternalException;
- }
-
}
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java?rev=1565726&r1=1565725&r2=1565726&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java Fri Feb 7 16:57:49 2014
@@ -32,18 +32,22 @@ import org.apache.qpid.AMQInternalExcept
import org.apache.qpid.AMQSecurityException;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.server.binding.Binding;
+import org.apache.qpid.server.consumer.Consumer;
import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.logging.messages.ExchangeMessages;
import org.apache.qpid.server.message.InstanceProperties;
+import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.UUIDGenerator;
import org.apache.qpid.server.plugin.ExchangeType;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.BaseQueue;
+import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.txn.ServerTransaction;
+import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.virtualhost.VirtualHost;
public class DefaultExchange implements Exchange
@@ -334,7 +338,7 @@ public class DefaultExchange implements
public final int send(final ServerMessage message,
final InstanceProperties instanceProperties,
final ServerTransaction txn,
- final BaseQueue.PostEnqueueAction postEnqueueAction)
+ final Action<MessageInstance<? extends Consumer>> postEnqueueAction)
{
final AMQQueue q = _virtualHost.getQueue(message.getRoutingKey());
if(q == null)
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/Exchange.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/Exchange.java?rev=1565726&r1=1565725&r2=1565726&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/Exchange.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/Exchange.java Fri Feb 7 16:57:49 2014
@@ -24,20 +24,16 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.AMQInternalException;
import org.apache.qpid.AMQSecurityException;
import org.apache.qpid.server.binding.Binding;
-import org.apache.qpid.server.message.InstanceProperties;
-import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.message.MessageDestination;
import org.apache.qpid.server.plugin.ExchangeType;
import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.BaseQueue;
-import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.virtualhost.VirtualHost;
import java.util.Collection;
-import java.util.List;
import java.util.Map;
import java.util.UUID;
-public interface Exchange extends ExchangeReferrer
+public interface Exchange extends ExchangeReferrer, MessageDestination
{
void initialise(UUID id, VirtualHost host, String name, boolean durable, boolean autoDelete)
throws AMQException;
@@ -95,19 +91,6 @@ public interface Exchange extends Exchan
void close() throws AMQException;
/**
- * Routes a message
- * @param message the message to be routed
- * @param instanceProperties the instance properties
- * @param txn the transaction to enqueue within
- * @param postEnqueueAction action to perform on the result of every enqueue (may be null)
- * @return the number of queues in which the message was enqueued performed
- */
- int send(ServerMessage message,
- InstanceProperties instanceProperties,
- ServerTransaction txn,
- BaseQueue.PostEnqueueAction postEnqueueAction);
-
- /**
* Determines whether a message would be isBound to a particular queue using a specific routing key and arguments
* @param bindingKey
* @param arguments
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/actors/CurrentActor.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/actors/CurrentActor.java?rev=1565726&r1=1565725&r2=1565726&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/actors/CurrentActor.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/actors/CurrentActor.java Fri Feb 7 16:57:49 2014
@@ -35,7 +35,6 @@ import org.apache.qpid.server.logging.Lo
* 2) We can set new actors at the point we have enough information. i.e.
* - Set a low level ConnectionActor when processing bytes from the wire.
* - Set a ChannelActor when we are processing the frame
- * - Set a SubscriptionActor when we are handling the subscription.
* <p/>
* The code performing the logging need not worry about what type of actor is
* currently set so can perform its logging. The resulting log entry though will
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/actors/GenericActor.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/actors/GenericActor.java?rev=1565726&r1=1565725&r2=1565726&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/actors/GenericActor.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/actors/GenericActor.java Fri Feb 7 16:57:49 2014
@@ -42,6 +42,24 @@ public class GenericActor extends Abstra
_defaultMessageLogger = defaultMessageLogger;
}
+ public GenericActor(final String logSubject)
+ {
+ this(new LogSubject()
+ {
+ @Override
+ public String toLogString()
+ {
+ return logSubject;
+ }
+ });
+ }
+
+
+ public GenericActor(LogSubject logSubject)
+ {
+ this(logSubject, CurrentActor.get().getRootMessageLogger());
+ }
+
public GenericActor(LogSubject logSubject, RootMessageLogger rootLogger)
{
super(rootLogger);
@@ -53,6 +71,11 @@ public class GenericActor extends Abstra
return _logSubject.toLogString();
}
+ public LogSubject getLogSubject()
+ {
+ return _logSubject;
+ }
+
public static LogActor getInstance(final String logMessage, RootMessageLogger rootLogger)
{
return new GenericActor(new LogSubject()
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Session.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Session.java?rev=1565726&r1=1565725&r2=1565726&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Session.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Session.java Fri Feb 7 16:57:49 2014
@@ -77,6 +77,6 @@ public interface Session extends Configu
CHANNEL_ID,
PRODUCER_FLOW_BLOCKED));
- Collection<Consumer> getSubscriptions();
+ Collection<Consumer> getConsumers();
Collection<Publisher> getPublishers();
}
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java?rev=1565726&r1=1565725&r2=1565726&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java Fri Feb 7 16:57:49 2014
@@ -20,6 +20,7 @@
*/
package org.apache.qpid.server.model;
+import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.security.SecurityManager;
import org.apache.qpid.server.store.MessageStore;
@@ -144,11 +145,11 @@ public interface VirtualHost extends Con
public static interface Transaction
{
- void dequeue(QueueEntry entry);
+ void dequeue(MessageInstance entry);
- void copy(QueueEntry entry, Queue queue);
+ void copy(MessageInstance entry, Queue queue);
- void move(QueueEntry entry, Queue queue);
+ void move(MessageInstance entry, Queue queue);
}
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConsumerAdapter.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConsumerAdapter.java?rev=1565726&r1=1565725&r2=1565726&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConsumerAdapter.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConsumerAdapter.java Fri Feb 7 16:57:49 2014
@@ -22,33 +22,32 @@ package org.apache.qpid.server.model.ada
import java.util.Map;
import org.apache.qpid.server.model.ConfiguredObject;
-import org.apache.qpid.server.model.Consumer;
import org.apache.qpid.server.model.LifetimePolicy;
import org.apache.qpid.server.model.State;
import org.apache.qpid.server.model.Statistics;
import org.apache.qpid.server.model.UUIDGenerator;
-import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.consumer.Consumer;
import java.security.AccessControlException;
import java.util.Collection;
import java.util.Collections;
-public class ConsumerAdapter extends AbstractAdapter implements Consumer
+public class ConsumerAdapter extends AbstractAdapter implements org.apache.qpid.server.model.Consumer
{
- private final Subscription _subscription;
+ private final Consumer _consumer;
private final QueueAdapter _queue;
private final SessionAdapter _session;
private final ConsumerStatistics _statistics;
public ConsumerAdapter(final QueueAdapter queueAdapter, final SessionAdapter sessionAdapter,
- final Subscription subscription)
+ final Consumer consumer)
{
super(UUIDGenerator.generateConsumerUUID(queueAdapter.getVirtualHost().getName(),
queueAdapter.getName(),
- subscription.getSessionModel().getConnectionModel().getRemoteAddressString(),
- String.valueOf(subscription.getSessionModel().getChannelId()),
- subscription.getConsumerName()), queueAdapter.getTaskExecutor());
- _subscription = subscription;
+ consumer.getSessionModel().getConnectionModel().getRemoteAddressString(),
+ String.valueOf(consumer.getSessionModel().getChannelId()),
+ consumer.getName()), queueAdapter.getTaskExecutor());
+ _consumer = consumer;
_queue = queueAdapter;
_session = sessionAdapter;
_statistics = new ConsumerStatistics();
@@ -57,7 +56,7 @@ public class ConsumerAdapter extends Abs
public String getName()
{
- return _subscription.getConsumerName();
+ return _consumer.getName();
}
public String setName(final String currentName, final String desiredName)
@@ -107,7 +106,7 @@ public class ConsumerAdapter extends Abs
@Override
public Collection<String> getAttributeNames()
{
- return Consumer.AVAILABLE_ATTRIBUTES;
+ return org.apache.qpid.server.model.Consumer.AVAILABLE_ATTRIBUTES;
}
@Override
@@ -147,7 +146,7 @@ public class ConsumerAdapter extends Abs
}
else if(DISTRIBUTION_MODE.equals(name))
{
- return _subscription.acquires() ? "MOVE" : "COPY";
+ return _consumer.acquires() ? "MOVE" : "COPY";
}
else if(SETTLEMENT_MODE.equals(name))
{
@@ -197,11 +196,11 @@ public class ConsumerAdapter extends Abs
{
if(name.equals(BYTES_OUT))
{
- return _subscription.getBytesOut();
+ return _consumer.getBytesOut();
}
else if(name.equals(MESSAGES_OUT))
{
- return _subscription.getMessagesOut();
+ return _consumer.getMessagesOut();
}
else if(name.equals(STATE_CHANGED))
{
@@ -209,11 +208,11 @@ public class ConsumerAdapter extends Abs
}
else if(name.equals(UNACKNOWLEDGED_BYTES))
{
- return _subscription.getUnacknowledgedBytes();
+ return _consumer.getUnacknowledgedBytes();
}
else if(name.equals(UNACKNOWLEDGED_MESSAGES))
{
- return _subscription.getUnacknowledgedMessages();
+ return _consumer.getUnacknowledgedMessages();
}
return null; // TODO - Implement
}
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java?rev=1565726&r1=1565725&r2=1565726&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java Fri Feb 7 16:57:49 2014
@@ -35,7 +35,6 @@ import org.apache.qpid.server.binding.Bi
import org.apache.qpid.server.configuration.IllegalConfigurationException;
import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.model.ConfiguredObjectFinder;
-import org.apache.qpid.server.model.Consumer;
import org.apache.qpid.server.model.Exchange;
import org.apache.qpid.server.model.IllegalStateTransitionException;
import org.apache.qpid.server.model.LifetimePolicy;
@@ -47,10 +46,11 @@ import org.apache.qpid.server.protocol.A
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.queue.*;
import org.apache.qpid.server.store.DurableConfigurationStoreHelper;
-import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.consumer.Consumer;
import org.apache.qpid.server.util.MapValueConverter;
-final class QueueAdapter extends AbstractAdapter implements Queue, AMQQueue.SubscriptionRegistrationListener, AMQQueue.NotificationListener
+final class QueueAdapter extends AbstractAdapter implements Queue,
+ AMQQueue.ConsumerRegistrationListener, AMQQueue.NotificationListener
{
@SuppressWarnings("serial")
static final Map<String, Type> ATTRIBUTE_TYPES = Collections.unmodifiableMap(new HashMap<String, Type>(){{
@@ -69,8 +69,8 @@ final class QueueAdapter extends Abstrac
private final AMQQueue _queue;
private final Map<Binding, BindingAdapter> _bindingAdapters =
new HashMap<Binding, BindingAdapter>();
- private Map<org.apache.qpid.server.subscription.Subscription, ConsumerAdapter> _consumerAdapters =
- new HashMap<org.apache.qpid.server.subscription.Subscription, ConsumerAdapter>();
+ private Map<Consumer, ConsumerAdapter> _consumerAdapters =
+ new HashMap<Consumer, ConsumerAdapter>();
private final VirtualHostAdapter _vhost;
@@ -84,7 +84,7 @@ final class QueueAdapter extends Abstrac
addParent(org.apache.qpid.server.model.VirtualHost.class, virtualHostAdapter);
_queue = queue;
- _queue.addSubscriptionRegistrationListener(this);
+ _queue.addConsumerRegistrationListener(this);
populateConsumers();
_statistics = new QueueStatisticsAdapter(queue);
_queue.setNotificationListener(this);
@@ -124,21 +124,21 @@ final class QueueAdapter extends Abstrac
private void populateConsumers()
{
- Collection<org.apache.qpid.server.subscription.Subscription> actualSubscriptions = _queue.getConsumers();
+ Collection<Consumer> actualConsumers = _queue.getConsumers();
synchronized (_consumerAdapters)
{
- Iterator<org.apache.qpid.server.subscription.Subscription> iter = _consumerAdapters.keySet().iterator();
- for(org.apache.qpid.server.subscription.Subscription subscription : actualSubscriptions)
+ Iterator<Consumer> iter = _consumerAdapters.keySet().iterator();
+ for(Consumer consumer : actualConsumers)
{
- if(!_consumerAdapters.containsKey(subscription))
+ if(!_consumerAdapters.containsKey(consumer))
{
- SessionAdapter sessionAdapter = getSessionAdapter(subscription.getSessionModel());
- ConsumerAdapter adapter = new ConsumerAdapter(this, sessionAdapter, subscription);
- _consumerAdapters.put(subscription, adapter);
+ SessionAdapter sessionAdapter = getSessionAdapter(consumer.getSessionModel());
+ ConsumerAdapter adapter = new ConsumerAdapter(this, sessionAdapter, consumer);
+ _consumerAdapters.put(consumer, adapter);
if (sessionAdapter != null)
{ // Register ConsumerAdapter with the SessionAdapter.
- sessionAdapter.subscriptionRegistered(subscription, adapter);
+ sessionAdapter.consumerRegistered(consumer, adapter);
}
}
}
@@ -153,11 +153,11 @@ final class QueueAdapter extends Abstrac
}
}
- public Collection<Consumer> getConsumers()
+ public Collection<org.apache.qpid.server.model.Consumer> getConsumers()
{
synchronized (_consumerAdapters)
{
- return new ArrayList<Consumer>(_consumerAdapters.values());
+ return new ArrayList<org.apache.qpid.server.model.Consumer>(_consumerAdapters.values());
}
}
@@ -502,7 +502,7 @@ final class QueueAdapter extends Abstrac
@Override
public <C extends ConfiguredObject> Collection<C> getChildren(Class<C> clazz)
{
- if(clazz == Consumer.class)
+ if(clazz == org.apache.qpid.server.model.Consumer.class)
{
return (Collection<C>) getConsumers();
}
@@ -587,19 +587,19 @@ final class QueueAdapter extends Abstrac
return _queue;
}
- public void subscriptionRegistered(final AMQQueue queue, final Subscription subscription)
+ public void consumerAdded(final AMQQueue queue, final Consumer consumer)
{
ConsumerAdapter adapter = null;
synchronized (_consumerAdapters)
{
- if(!_consumerAdapters.containsKey(subscription))
+ if(!_consumerAdapters.containsKey(consumer))
{
- SessionAdapter sessionAdapter = getSessionAdapter(subscription.getSessionModel());
- adapter = new ConsumerAdapter(this, sessionAdapter, subscription);
- _consumerAdapters.put(subscription, adapter);
+ SessionAdapter sessionAdapter = getSessionAdapter(consumer.getSessionModel());
+ adapter = new ConsumerAdapter(this, sessionAdapter, consumer);
+ _consumerAdapters.put(consumer, adapter);
if (sessionAdapter != null)
{ // Register ConsumerAdapter with the SessionAdapter.
- sessionAdapter.subscriptionRegistered(subscription, adapter);
+ sessionAdapter.consumerRegistered(consumer, adapter);
}
}
}
@@ -609,20 +609,20 @@ final class QueueAdapter extends Abstrac
}
}
- public void subscriptionUnregistered(final AMQQueue queue, final Subscription subscription)
+ public void consumerRemoved(final AMQQueue queue, final Consumer consumer)
{
ConsumerAdapter adapter = null;
synchronized (_consumerAdapters)
{
- adapter = _consumerAdapters.remove(subscription);
+ adapter = _consumerAdapters.remove(consumer);
}
if(adapter != null)
{
- SessionAdapter sessionAdapter = getSessionAdapter(subscription.getSessionModel());
+ SessionAdapter sessionAdapter = getSessionAdapter(consumer.getSessionModel());
if (sessionAdapter != null)
{ // Unregister ConsumerAdapter with the SessionAdapter.
- sessionAdapter.subscriptionUnregistered(subscription);
+ sessionAdapter.consumerUnregistered(consumer);
}
childRemoved(adapter);
}
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java?rev=1565726&r1=1565725&r2=1565726&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java Fri Feb 7 16:57:49 2014
@@ -34,9 +34,8 @@ import org.apache.qpid.server.model.Publ
import org.apache.qpid.server.model.Session;
import org.apache.qpid.server.model.State;
import org.apache.qpid.server.model.Statistics;
-import org.apache.qpid.server.model.Consumer;
import org.apache.qpid.server.model.UUIDGenerator;
-import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.consumer.Consumer;
import org.apache.qpid.server.configuration.updater.TaskExecutor;
import org.apache.qpid.server.protocol.AMQSessionModel;
@@ -47,7 +46,7 @@ final class SessionAdapter extends Abstr
private AMQSessionModel _session;
private SessionStatistics _statistics;
- private Map<Subscription, ConsumerAdapter> _consumerAdapters = new HashMap<Subscription, ConsumerAdapter>();
+ private Map<Consumer, ConsumerAdapter> _consumerAdapters = new HashMap<Consumer, ConsumerAdapter>();
public SessionAdapter(final AMQSessionModel session, TaskExecutor taskExecutor)
{
@@ -56,11 +55,11 @@ final class SessionAdapter extends Abstr
_statistics = new SessionStatistics();
}
- public Collection<Consumer> getSubscriptions()
+ public Collection<org.apache.qpid.server.model.Consumer> getConsumers()
{
synchronized (_consumerAdapters)
{
- return new ArrayList<Consumer>(_consumerAdapters.values());
+ return new ArrayList<org.apache.qpid.server.model.Consumer>(_consumerAdapters.values());
}
}
@@ -119,29 +118,29 @@ final class SessionAdapter extends Abstr
}
/**
- * Register a ConsumerAdapter (Subscription) with this Session keyed by the Subscription.
- * @param subscription the org.apache.qpid.server.subscription.Subscription used to key the ConsumerAdapter.
+ * Register a ConsumerAdapter with this Session keyed by the Consumer.
+ * @param consumer the org.apache.qpid.server.consumer.Consumer used to key the ConsumerAdapter.
* @param adapter the registered ConsumerAdapter.
*/
- void subscriptionRegistered(Subscription subscription, ConsumerAdapter adapter)
+ void consumerRegistered(Consumer consumer, ConsumerAdapter adapter)
{
synchronized (_consumerAdapters)
{
- _consumerAdapters.put(subscription, adapter);
+ _consumerAdapters.put(consumer, adapter);
}
childAdded(adapter);
}
/**
- * Unregister a ConsumerAdapter (Subscription) with this Session keyed by the Subscription.
- * @param subscription the org.apache.qpid.server.subscription.Subscription used to key the ConsumerAdapter.
+ * Unregister a ConsumerAdapter with this Session keyed by the Consumer.
+ * @param consumer the org.apache.qpid.server.consumer.Consumer used to key the ConsumerAdapter.
*/
- void subscriptionUnregistered(Subscription subscription)
+ void consumerUnregistered(Consumer consumer)
{
ConsumerAdapter adapter = null;
synchronized (_consumerAdapters)
{
- adapter = _consumerAdapters.remove(subscription);
+ adapter = _consumerAdapters.remove(consumer);
}
if (adapter != null)
{
@@ -188,9 +187,9 @@ final class SessionAdapter extends Abstr
@Override
public <C extends ConfiguredObject> Collection<C> getChildren(Class<C> clazz)
{
- if(clazz == Consumer.class)
+ if(clazz == org.apache.qpid.server.model.Consumer.class)
{
- return (Collection<C>) getSubscriptions();
+ return (Collection<C>) getConsumers();
}
else if(clazz == Publisher.class)
{
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java?rev=1565726&r1=1565725&r2=1565726&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java Fri Feb 7 16:57:49 2014
@@ -44,6 +44,7 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.server.configuration.IllegalConfigurationException;
import org.apache.qpid.server.configuration.VirtualHostConfiguration;
import org.apache.qpid.server.configuration.XmlConfigurationUtilities.MyConfiguration;
+import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.ConfiguredObject;
@@ -759,11 +760,11 @@ public final class VirtualHostAdapter ex
op.withinTransaction(new Transaction()
{
- public void dequeue(final QueueEntry entry)
+ public void dequeue(final MessageInstance entry)
{
if(entry.acquire())
{
- txn.dequeue(entry.getQueue(), entry.getMessage(), new ServerTransaction.Action()
+ txn.dequeue(entry.getOwningResource(), entry.getMessage(), new ServerTransaction.Action()
{
public void postCommit()
{
@@ -777,7 +778,7 @@ public final class VirtualHostAdapter ex
}
}
- public void copy(QueueEntry entry, Queue queue)
+ public void copy(MessageInstance entry, Queue queue)
{
final ServerMessage message = entry.getMessage();
final AMQQueue toQueue = ((QueueAdapter)queue).getAMQQueue();
@@ -788,7 +789,7 @@ public final class VirtualHostAdapter ex
{
try
{
- toQueue.enqueue(message);
+ toQueue.enqueue(message, null);
}
catch(AMQException e)
{
@@ -803,7 +804,7 @@ public final class VirtualHostAdapter ex
}
- public void move(final QueueEntry entry, Queue queue)
+ public void move(final MessageInstance entry, Queue queue)
{
final ServerMessage message = entry.getMessage();
final AMQQueue toQueue = ((QueueAdapter)queue).getAMQQueue();
@@ -817,7 +818,7 @@ public final class VirtualHostAdapter ex
{
try
{
- toQueue.enqueue(message);
+ toQueue.enqueue(message, null);
}
catch (AMQException e)
{
@@ -830,7 +831,7 @@ public final class VirtualHostAdapter ex
entry.release();
}
});
- txn.dequeue(entry.getQueue(), message,
+ txn.dequeue(entry.getOwningResource(), message,
new ServerTransaction.Action()
{
Propchange: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/
------------------------------------------------------------------------------
Merged /qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue:r1562456-1565710
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java?rev=1565726&r1=1565725&r2=1565726&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java Fri Feb 7 16:57:49 2014
@@ -26,19 +26,20 @@ import org.apache.qpid.server.configurat
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.exchange.ExchangeReferrer;
import org.apache.qpid.server.logging.LogSubject;
-import org.apache.qpid.server.protocol.AMQSessionModel;
-import org.apache.qpid.server.security.AuthorizationHolder;
-import org.apache.qpid.server.store.TransactionLogResource;
-import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.message.MessageDestination;
+import org.apache.qpid.server.message.MessageSource;
+import org.apache.qpid.server.protocol.CapacityChecker;
+import org.apache.qpid.server.consumer.Consumer;
+import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.virtualhost.VirtualHost;
import java.util.Collection;
import java.util.List;
import java.util.Set;
-public interface AMQQueue extends Comparable<AMQQueue>, ExchangeReferrer, TransactionLogResource, BaseQueue
+public interface AMQQueue<C extends Consumer> extends Comparable<AMQQueue<C>>, ExchangeReferrer, BaseQueue,
+ MessageSource<C>, CapacityChecker, MessageDestination
{
- String getName();
public interface NotificationListener
{
@@ -65,45 +66,20 @@ public interface AMQQueue extends Compar
long getTotalEnqueueCount();
- public interface Context
- {
- QueueEntry getLastSeenEntry();
- }
-
void setNoLocal(boolean b);
boolean isAutoDelete();
String getOwner();
- AuthorizationHolder getAuthorizationHolder();
- void setAuthorizationHolder(AuthorizationHolder principalHolder);
-
- void setExclusiveOwningSession(AMQSessionModel owner);
- AMQSessionModel getExclusiveOwningSession();
VirtualHost getVirtualHost();
- void registerSubscription(final Subscription subscription, final boolean exclusive) throws AMQException;
-
- void unregisterSubscription(final Subscription subscription) throws AMQException;
-
- Collection<Subscription> getConsumers();
-
- interface SubscriptionRegistrationListener
- {
- void subscriptionRegistered(AMQQueue queue, Subscription subscription);
- void subscriptionUnregistered(AMQQueue queue, Subscription subscription);
- }
-
- void addSubscriptionRegistrationListener(SubscriptionRegistrationListener listener);
- void removeSubscriptionRegistrationListener(SubscriptionRegistrationListener listener);
-
int getConsumerCount();
int getActiveConsumerCount();
- boolean hasExclusiveSubscriber();
+ boolean hasExclusiveConsumer();
boolean isUnused();
@@ -126,14 +102,14 @@ public interface AMQQueue extends Compar
void requeue(QueueEntry entry);
- void dequeue(QueueEntry entry, Subscription sub);
+ void dequeue(QueueEntry entry, Consumer sub);
void decrementUnackedMsgCount(QueueEntry queueEntry);
- boolean resend(final QueueEntry entry, final Subscription subscription) throws AMQException;
+ boolean resend(final QueueEntry entry, final Consumer consumer) throws AMQException;
- void addQueueDeleteTask(final Task task);
- void removeQueueDeleteTask(final Task task);
+ void addQueueDeleteTask(Action<AMQQueue> task);
+ void removeQueueDeleteTask(Action<AMQQueue> task);
@@ -209,16 +185,10 @@ public interface AMQQueue extends Compar
Set<NotificationCheck> getNotificationChecks();
- void flushSubscription(final Subscription sub) throws AMQException;
-
- void deliverAsync(final Subscription sub);
-
void deliverAsync();
void stop();
- boolean isExclusive();
-
Exchange getAlternateExchange();
void setAlternateExchange(Exchange exchange);
@@ -226,56 +196,6 @@ public interface AMQQueue extends Compar
Collection<String> getAvailableAttributes();
Object getAttribute(String attrName);
- void checkCapacity(AMQSessionModel channel);
-
- /**
- * ExistingExclusiveSubscription signals a failure to create a subscription, because an exclusive subscription
- * already exists.
- *
- * <p/><table id="crc"><caption>CRC Card</caption>
- * <tr><th> Responsibilities <th> Collaborations
- * <tr><td> Represent failure to create a subscription, because an exclusive subscription already exists.
- * </table>
- *
- * @todo Not an AMQP exception as no status code.
- *
- * @todo Move to top level, used outside this class.
- */
- static final class ExistingExclusiveSubscription extends AMQException
- {
-
- public ExistingExclusiveSubscription()
- {
- super("");
- }
- }
-
- /**
- * ExistingSubscriptionPreventsExclusive signals a failure to create an exclusive subscription, as a subscription
- * already exists.
- *
- * <p/><table id="crc"><caption>CRC Card</caption>
- * <tr><th> Responsibilities <th> Collaborations
- * <tr><td> Represent failure to create an exclusive subscription, as a subscription already exists.
- * </table>
- *
- * @todo Not an AMQP exception as no status code.
- *
- * @todo Move to top level, used outside this class.
- */
- static final class ExistingSubscriptionPreventsExclusive extends AMQException
- {
- public ExistingSubscriptionPreventsExclusive()
- {
- super("");
- }
- }
-
- static interface Task
- {
- public void doTask(AMQQueue queue) throws AMQException;
- }
-
void configure(QueueConfiguration config);
void setExclusive(boolean exclusive);
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/BaseQueue.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/BaseQueue.java?rev=1565726&r1=1565725&r2=1565726&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/BaseQueue.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/BaseQueue.java Fri Feb 7 16:57:49 2014
@@ -22,19 +22,15 @@
package org.apache.qpid.server.queue;
import org.apache.qpid.AMQException;
+import org.apache.qpid.server.consumer.Consumer;
+import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.store.TransactionLogResource;
+import org.apache.qpid.server.util.Action;
public interface BaseQueue extends TransactionLogResource
{
- public static interface PostEnqueueAction
- {
- public void onEnqueue(QueueEntry entry);
- }
-
- void enqueue(ServerMessage message) throws AMQException;
- void enqueue(ServerMessage message, PostEnqueueAction action) throws AMQException;
- void enqueue(ServerMessage message, boolean transactional, PostEnqueueAction action) throws AMQException;
+ void enqueue(ServerMessage message, Action<MessageInstance<? extends Consumer>> action) throws AMQException;
boolean isDurable();
boolean isDeleted();
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java?rev=1565726&r1=1565725&r2=1565726&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java Fri Feb 7 16:57:49 2014
@@ -43,7 +43,7 @@ public class ConflationQueueList extends
private final QueueEntry _deleteInProgress = new SimpleQueueEntryImpl(this);
private final QueueEntry _newerEntryAlreadyBeenAndGone = new SimpleQueueEntryImpl(this);
- public ConflationQueueList(AMQQueue queue, String conflationKey)
+ public ConflationQueueList(AMQQueue<QueueConsumer> queue, String conflationKey)
{
super(queue);
_conflationKey = conflationKey;
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/OutOfOrderQueue.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/OutOfOrderQueue.java?rev=1565726&r1=1565725&r2=1565726&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/OutOfOrderQueue.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/OutOfOrderQueue.java Fri Feb 7 16:57:49 2014
@@ -20,8 +20,6 @@
*/
package org.apache.qpid.server.queue;
-import org.apache.qpid.server.subscription.Subscription;
-import org.apache.qpid.server.subscription.SubscriptionList;
import org.apache.qpid.server.virtualhost.VirtualHost;
import java.util.Map;
@@ -38,16 +36,16 @@ public abstract class OutOfOrderQueue ex
}
@Override
- protected void checkSubscriptionsNotAheadOfDelivery(final QueueEntry entry)
+ protected void checkConsumersNotAheadOfDelivery(final QueueEntry entry)
{
- // check that all subscriptions are not in advance of the entry
- SubscriptionList.SubscriptionNodeIterator subIter = getSubscriptionList().iterator();
+ // check that all consumers are not in advance of the entry
+ QueueConsumerList.ConsumerNodeIterator subIter = getConsumerList().iterator();
while(subIter.advance() && !entry.isAcquired())
{
- final Subscription subscription = subIter.getNode().getSubscription();
- if(!subscription.isClosed())
+ final QueueConsumer consumer = subIter.getNode().getConsumer();
+ if(!consumer.isClosed())
{
- QueueContext context = (QueueContext) subscription.getQueueContext();
+ QueueContext context = consumer.getQueueContext();
if(context != null)
{
QueueEntry released = context.getReleasedEntry();
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java?rev=1565726&r1=1565725&r2=1565726&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java Fri Feb 7 16:57:49 2014
@@ -24,7 +24,7 @@ import org.apache.qpid.server.message.Se
public class PriorityQueueList implements QueueEntryList<SimpleQueueEntryImpl>
{
- private final AMQQueue _queue;
+ private final AMQQueue<QueueConsumer> _queue;
private final PriorityQueueEntrySubList[] _priorityLists;
private final int _priorities;
private final int _priorityOffset;
@@ -46,7 +46,7 @@ public class PriorityQueueList implement
return _priorities;
}
- public AMQQueue getQueue()
+ public AMQQueue<QueueConsumer> getQueue()
{
return _queue;
}
@@ -166,7 +166,7 @@ public class PriorityQueueList implement
{
private int _listPriority;
- public PriorityQueueEntrySubList(AMQQueue queue, int listPriority)
+ public PriorityQueueEntrySubList(AMQQueue<QueueConsumer> queue, int listPriority)
{
super(queue);
_listPriority = listPriority;
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueContext.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueContext.java?rev=1565726&r1=1565725&r2=1565726&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueContext.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueContext.java Fri Feb 7 16:57:49 2014
@@ -23,7 +23,7 @@ package org.apache.qpid.server.queue;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
-final class QueueContext implements AMQQueue.Context
+final class QueueContext
{
private volatile QueueEntry _lastSeenEntry;
private volatile QueueEntry _releasedEntry;
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java?rev=1565726&r1=1565725&r2=1565726&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java Fri Feb 7 16:57:49 2014
@@ -20,207 +20,20 @@
*/
package org.apache.qpid.server.queue;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.server.filter.Filterable;
import org.apache.qpid.server.message.MessageInstance;
-import org.apache.qpid.server.subscription.Subscription;
-import org.apache.qpid.server.txn.ServerTransaction;
-public interface QueueEntry extends MessageInstance, Comparable<QueueEntry>
+public interface QueueEntry extends MessageInstance<QueueConsumer>, Comparable<QueueEntry>
{
-
-
- public static enum State
- {
- AVAILABLE,
- ACQUIRED,
- EXPIRED,
- DEQUEUED,
- DELETED;
-
-
- }
-
- public static interface StateChangeListener
- {
- public void stateChanged(QueueEntry entry, State oldSate, State newState);
- }
-
- public abstract class EntryState
- {
- private EntryState()
- {
- }
-
- public abstract State getState();
-
- /**
- * Returns true if state is either DEQUEUED or DELETED.
- *
- * @return true if state is either DEQUEUED or DELETED.
- */
- public boolean isDispensed()
- {
- State currentState = getState();
- return currentState == State.DEQUEUED || currentState == State.DELETED;
- }
- }
-
-
- public final class AvailableState extends EntryState
- {
-
- public State getState()
- {
- return State.AVAILABLE;
- }
-
- public String toString()
- {
- return getState().name();
- }
- }
-
-
- public final class DequeuedState extends EntryState
- {
-
- public State getState()
- {
- return State.DEQUEUED;
- }
-
- public String toString()
- {
- return getState().name();
- }
- }
-
-
- public final class DeletedState extends EntryState
- {
-
- public State getState()
- {
- return State.DELETED;
- }
-
- public String toString()
- {
- return getState().name();
- }
- }
-
- public final class ExpiredState extends EntryState
- {
-
- public State getState()
- {
- return State.EXPIRED;
- }
-
- public String toString()
- {
- return getState().name();
- }
- }
-
-
- public final class NonSubscriptionAcquiredState extends EntryState
- {
- public State getState()
- {
- return State.ACQUIRED;
- }
-
- public String toString()
- {
- return getState().name();
- }
- }
-
- public final class SubscriptionAcquiredState extends EntryState
- {
- private final Subscription _subscription;
-
- public SubscriptionAcquiredState(Subscription subscription)
- {
- _subscription = subscription;
- }
-
-
- public State getState()
- {
- return State.ACQUIRED;
- }
-
- public Subscription getSubscription()
- {
- return _subscription;
- }
-
- public String toString()
- {
- return "{" + getState().name() + " : " + _subscription +"}";
- }
- }
-
-
- final static EntryState AVAILABLE_STATE = new AvailableState();
- final static EntryState DELETED_STATE = new DeletedState();
- final static EntryState DEQUEUED_STATE = new DequeuedState();
- final static EntryState NON_SUBSCRIPTION_ACQUIRED_STATE = new NonSubscriptionAcquiredState();
-
-
-
-
- AMQQueue getQueue();
+ AMQQueue<QueueConsumer> getQueue();
long getSize();
- boolean getDeliveredToConsumer();
-
- boolean expired() throws AMQException;
-
- boolean acquire(Subscription sub);
-
- boolean acquiredBySubscription();
- boolean isAcquiredBy(Subscription subscription);
-
- void setRedelivered();
-
- boolean isRedelivered();
-
- Subscription getDeliveredSubscription();
-
- void reject();
-
- boolean isRejectedBy(long subscriptionId);
-
- int routeToAlternate(final BaseQueue.PostEnqueueAction action, ServerTransaction txn);
-
boolean isQueueDeleted();
QueueEntry getNextNode();
QueueEntry getNextValidEntry();
- void addStateChangeListener(StateChangeListener listener);
- boolean removeStateChangeListener(StateChangeListener listener);
-
-
- /**
- * Number of times this queue entry has been delivered.
- *
- * @return delivery count
- */
- int getDeliveryCount();
-
- void incrementDeliveryCount();
-
- void decrementDeliveryCount();
-
- Filterable asFilterable();
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org