You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2014/02/07 17:57:52 UTC

svn commit: r1565726 [1/6] - in /qpid/trunk/qpid/java: ./ amqp-1-0-client-jms/ amqp-1-0-client/ amqp-1-0-common/ bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/ bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrad...

Author: rgodfrey
Date: Fri Feb  7 16:57:49 2014
New Revision: 1565726

URL: http://svn.apache.org/r1565726
Log:
QPID-5504 : Refactoring to allow for nodes other than queues to be subscribed from, and nodes other than exchanges to be sent to (merged from separate branch)

Added:
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/
      - copied from r1565710, qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/consumer/
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageDestination.java
      - copied unchanged from r1565710, qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageDestination.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java
      - copied unchanged from r1565710, qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageNode.java
      - copied unchanged from r1565710, qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageNode.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageSource.java
      - copied unchanged from r1565710, qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageSource.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/SystemNodeCreator.java
      - copied unchanged from r1565710, qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/plugin/SystemNodeCreator.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/CapacityChecker.java
      - copied unchanged from r1565710, qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/protocol/CapacityChecker.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AssignedConsumerMessageGroupManager.java
      - copied unchanged from r1565710, qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/AssignedConsumerMessageGroupManager.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/DefinedGroupMessageGroupManager.java
      - copied unchanged from r1565710, qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/DefinedGroupMessageGroupManager.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/MessageGroupManager.java
      - copied unchanged from r1565710, qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/MessageGroupManager.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java
      - copied unchanged from r1565710, qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerList.java
      - copied unchanged from r1565710, qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerList.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/util/Action.java
      - copied unchanged from r1565710, qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/util/Action.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/util/StateChangeListener.java
      - copied unchanged from r1565710, qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/util/StateChangeListener.java
    qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/
      - copied from r1565710, qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/consumer/
    qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/logging/messages/ConsumerMessagesTest.java
      - copied unchanged from r1565710, qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/logging/messages/ConsumerMessagesTest.java
    qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/ConsumerListTest.java
      - copied unchanged from r1565710, qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/test/java/org/apache/qpid/server/queue/ConsumerListTest.java
    qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
      - copied unchanged from r1565710, qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
    qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ClientDeliveryMethod.java
      - copied unchanged from r1565710, qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ClientDeliveryMethod.java
    qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
      - copied unchanged from r1565710, qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
    qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/RecordDeliveryMethod.java
      - copied unchanged from r1565710, qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/RecordDeliveryMethod.java
    qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
      - copied unchanged from r1565710, qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
    qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageSourceDestination.java
      - copied unchanged from r1565710, qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageSourceDestination.java
    qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java
      - copied unchanged from r1565710, qpid/branches/java-broker-amqp-1-0-management/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java
    qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/ConsumerLoggingTest.java
      - copied unchanged from r1565710, qpid/branches/java-broker-amqp-1-0-management/java/systests/src/main/java/org/apache/qpid/server/logging/ConsumerLoggingTest.java
Removed:
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/actors/SubscriptionActor.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/subjects/SubscriptionLogSubject.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/subscription/
    qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/logging/actors/SubscriptionActorTest.java
    qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/logging/messages/SubscriptionMessagesTest.java
    qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/logging/subjects/SubscriptionLogSubjectTest.java
    qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/subscription/
    qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/Subscription_0_10.java
    qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionFactory.java
    qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionFactoryImpl.java
    qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionImpl.java
    qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/SubscriptionFactoryImplTest.java
    qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java
    qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/logging/SubscriptionLoggingTest.java
Modified:
    qpid/trunk/qpid/java/   (props changed)
    qpid/trunk/qpid/java/amqp-1-0-client/   (props changed)
    qpid/trunk/qpid/java/amqp-1-0-client-jms/   (props changed)
    qpid/trunk/qpid/java/amqp-1-0-common/   (props changed)
    qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/MessageMetaDataBinding.java
    qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/PreparedTransactionBinding.java
    qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom4To5.java
    qpid/trunk/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java
    qpid/trunk/qpid/java/broker-core/   (props changed)
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/Exchange.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/actors/CurrentActor.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/actors/GenericActor.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Session.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConsumerAdapter.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/   (props changed)
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/BaseQueue.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/OutOfOrderQueue.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueContext.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueRunner.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueue.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java   (contents, props changed)
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationStoreHelper.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StorableMessageMetaData.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/Transaction.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/TransactionLogResource.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/DistributedTransaction.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/DtxBranch.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/   (props changed)
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java
    qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java
    qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/logging/messages/VirtualHostMessagesTest.java
    qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java
    qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
    qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java
    qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java
    qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
    qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java
    qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MessageStoreQuotaEventsTestBase.java
    qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaData.java
    qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/AutoCommitTransactionTest.java
    qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/LocalTransactionTest.java
    qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java
    qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ExplicitAcceptDispositionChangeListener.java
    qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ImplicitAcceptDispositionChangeListener.java
    qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageAcceptCompletionListener.java
    qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaData_0_10.java
    qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
    qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
    qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
    qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
    qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
    qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolSession.java
    qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeue.java
    qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/IncomingMessage.java
    qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java
    qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMap.java
    qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapImpl.java
    qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java
    qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java
    qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicPublishMethodHandler.java
    qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRecoverMethodHandler.java
    qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRecoverSyncMethodHandler.java
    qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRejectMethodHandler.java
    qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeclareHandler.java
    qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/TxRollbackHandler.java
    qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java
    qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AcknowledgeTest.java
    qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeueTest.java
    qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java
    qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/QueueBrowserUsesNoAckTest.java
    qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/   (props changed)
    qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
    qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java
    qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java   (contents, props changed)
    qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java   (props changed)
    qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/QueueDestination.java
    qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
    qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
    qpid/trunk/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/MessageServlet.java
    qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/ConnectionRestTest.java
    qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/RollbackOrderTest.java
    qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/MaxDeliveryCountTest.java
    qpid/trunk/qpid/java/test-profiles/   (props changed)
    qpid/trunk/qpid/java/test-profiles/Java010Excludes

Propchange: qpid/trunk/qpid/java/
------------------------------------------------------------------------------
  Merged /qpid/branches/java-broker-amqp-1-0-management/java:r1562456-1565710

Propchange: qpid/trunk/qpid/java/amqp-1-0-client/
------------------------------------------------------------------------------
  Merged /qpid/branches/java-broker-amqp-1-0-management/java/amqp-1-0-client:r1562456-1565710

Propchange: qpid/trunk/qpid/java/amqp-1-0-client-jms/
------------------------------------------------------------------------------
  Merged /qpid/branches/java-broker-amqp-1-0-management/java/amqp-1-0-client-jms:r1562456-1565710

Propchange: qpid/trunk/qpid/java/amqp-1-0-common/
------------------------------------------------------------------------------
  Merged /qpid/branches/java-broker-amqp-1-0-management/java/amqp-1-0-common:r1562456-1565710

Modified: qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/MessageMetaDataBinding.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/MessageMetaDataBinding.java?rev=1565726&r1=1565725&r2=1565726&view=diff
==============================================================================
--- qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/MessageMetaDataBinding.java (original)
+++ qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/MessageMetaDataBinding.java Fri Feb  7 16:57:49 2014
@@ -69,7 +69,7 @@ public class MessageMetaDataBinding exte
         buf.position(1);
         buf = buf.slice();
 
-        metaData.writeToBuffer(0, buf);
+        metaData.writeToBuffer(buf);
         tupleOutput.writeInt(bodySize);
         tupleOutput.writeFast(underlying);
     }

Modified: qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/PreparedTransactionBinding.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/PreparedTransactionBinding.java?rev=1565726&r1=1565725&r2=1565726&view=diff
==============================================================================
--- qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/PreparedTransactionBinding.java (original)
+++ qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/PreparedTransactionBinding.java Fri Feb  7 16:57:49 2014
@@ -73,7 +73,7 @@ public class PreparedTransactionBinding 
             output.writeInt(records.length);
             for(Transaction.Record record : records)
             {
-                UUID id = record.getQueue().getId();
+                UUID id = record.getResource().getId();
                 output.writeLong(id.getMostSignificantBits());
                 output.writeLong(id.getLeastSignificantBits());
                 output.writeLong(record.getMessage().getMessageNumber());
@@ -93,7 +93,7 @@ public class PreparedTransactionBinding 
             _queueId = queueId;
         }
 
-        public TransactionLogResource getQueue()
+        public TransactionLogResource getResource()
         {
             return this;
         }
@@ -119,9 +119,21 @@ public class PreparedTransactionBinding 
         }
 
         @Override
+        public String getName()
+        {
+            return _queueId.toString();
+        }
+
+        @Override
         public UUID getId()
         {
             return _queueId;
         }
+
+        @Override
+        public boolean isDurable()
+        {
+            return true;
+        }
     }
 }

Modified: qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom4To5.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom4To5.java?rev=1565726&r1=1565725&r2=1565726&view=diff
==============================================================================
--- qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom4To5.java (original)
+++ qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom4To5.java Fri Feb  7 16:57:49 2014
@@ -741,7 +741,7 @@ public class UpgradeFrom4To5 extends Abs
             buf.position(1);
             buf = buf.slice();
 
-            metaData.writeToBuffer(0, buf);
+            metaData.writeToBuffer(buf);
             output.writeInt(bodySize);
             output.writeFast(underlying);
         }

Modified: qpid/trunk/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java?rev=1565726&r1=1565725&r2=1565726&view=diff
==============================================================================
--- qpid/trunk/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java (original)
+++ qpid/trunk/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java Fri Feb  7 16:57:49 2014
@@ -416,10 +416,22 @@ public class BDBMessageStoreTest extends
         TransactionLogResource mockQueue = new TransactionLogResource()
         {
             @Override
+            public String getName()
+            {
+                return getId().toString();
+            }
+
+            @Override
             public UUID getId()
             {
                 return mockQueueId;
             }
+
+            @Override
+            public boolean isDurable()
+            {
+                return true;
+            }
         };
 
         Transaction txn = log.newTransaction();
@@ -454,10 +466,22 @@ public class BDBMessageStoreTest extends
         TransactionLogResource mockQueue = new TransactionLogResource()
         {
             @Override
+            public String getName()
+            {
+                return getId().toString();
+            }
+
+            @Override
             public UUID getId()
             {
                 return mockQueueId;
             }
+
+            @Override
+            public boolean isDurable()
+            {
+                return true;
+            }
         };
 
         Transaction txn = log.newTransaction();
@@ -511,10 +535,22 @@ public class BDBMessageStoreTest extends
         TransactionLogResource mockQueue = new TransactionLogResource()
         {
             @Override
+            public String getName()
+            {
+                return getId().toString();
+            }
+
+            @Override
             public UUID getId()
             {
                 return mockQueueId;
             }
+
+            @Override
+            public boolean isDurable()
+            {
+                return true;
+            }
         };
 
         Transaction txn = log.newTransaction();

Propchange: qpid/trunk/qpid/java/broker-core/
------------------------------------------------------------------------------
  Merged /qpid/branches/java-broker-amqp-1-0-management/java/broker-core:r1562456-1565710

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java?rev=1565726&r1=1565725&r2=1565726&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java Fri Feb  7 16:57:49 2014
@@ -26,6 +26,7 @@ import org.apache.qpid.AMQException;
 import org.apache.qpid.AMQInternalException;
 import org.apache.qpid.AMQSecurityException;
 import org.apache.qpid.server.binding.Binding;
+import org.apache.qpid.server.consumer.Consumer;
 import org.apache.qpid.server.logging.LogSubject;
 import org.apache.qpid.server.logging.actors.CurrentActor;
 import org.apache.qpid.server.logging.messages.BindingMessages;
@@ -33,14 +34,17 @@ import org.apache.qpid.server.logging.me
 import org.apache.qpid.server.logging.subjects.BindingLogSubject;
 import org.apache.qpid.server.logging.subjects.ExchangeLogSubject;
 import org.apache.qpid.server.message.InstanceProperties;
+import org.apache.qpid.server.message.MessageInstance;
 import org.apache.qpid.server.message.MessageReference;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.model.UUIDGenerator;
 import org.apache.qpid.server.plugin.ExchangeType;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.queue.BaseQueue;
+import org.apache.qpid.server.queue.QueueEntry;
 import org.apache.qpid.server.store.DurableConfigurationStoreHelper;
 import org.apache.qpid.server.txn.ServerTransaction;
+import org.apache.qpid.server.util.Action;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 
 import java.util.Collection;
@@ -66,7 +70,7 @@ public abstract class AbstractExchange i
 
     private VirtualHost _virtualHost;
 
-    private final List<Task> _closeTaskList = new CopyOnWriteArrayList<Task>();
+    private final List<Action<Exchange>> _closeTaskList = new CopyOnWriteArrayList<Action<Exchange>>();
 
     /**
      * Whether the exchange is automatically deleted once all queues have detached from it
@@ -138,6 +142,12 @@ public abstract class AbstractExchange i
 
         if(_closed.compareAndSet(false,true))
         {
+            List<Binding> bindings = new ArrayList<Binding>(_bindings);
+            for(Binding binding : bindings)
+            {
+                removeBinding(binding);
+            }
+
             if(_alternateExchange != null)
             {
                 _alternateExchange.removeReference(this);
@@ -145,9 +155,9 @@ public abstract class AbstractExchange i
 
             CurrentActor.get().message(_logSubject, ExchangeMessages.DELETED());
 
-            for(Task task : _closeTaskList)
+            for(Action<Exchange> task : _closeTaskList)
             {
-                task.onClose(this);
+                task.performAction(this);
             }
             _closeTaskList.clear();
         }
@@ -300,12 +310,12 @@ public abstract class AbstractExchange i
         return !_referrers.isEmpty();
     }
 
-    public void addCloseTask(final Task task)
+    public void addCloseTask(final Action<Exchange> task)
     {
         _closeTaskList.add(task);
     }
 
-    public void removeCloseTask(final Task task)
+    public void removeCloseTask(final Action<Exchange> task)
     {
         _closeTaskList.remove(task);
     }
@@ -421,7 +431,7 @@ public abstract class AbstractExchange i
     public final int send(final ServerMessage message,
                           final InstanceProperties instanceProperties,
                           final ServerTransaction txn,
-                          final BaseQueue.PostEnqueueAction postEnqueueAction)
+                          final Action<MessageInstance<? extends Consumer>> postEnqueueAction)
     {
         List<? extends BaseQueue> queues = route(message, instanceProperties);
 
@@ -579,8 +589,6 @@ public abstract class AbstractExchange i
         {
             doRemoveBinding(b);
             queue.removeBinding(b);
-            removeCloseTask(b);
-            queue.removeQueueDeleteTask(b);
 
             if (b.isDurable())
             {
@@ -659,8 +667,6 @@ public abstract class AbstractExchange i
                 DurableConfigurationStoreHelper.createBinding(_virtualHost.getDurableConfigurationStore(), b);
             }
 
-            queue.addQueueDeleteTask(b);
-            addCloseTask(b);
             queue.addBinding(b);
             doAddBinding(b);
             b.logCreation();
@@ -673,7 +679,7 @@ public abstract class AbstractExchange i
         }
     }
 
-    private final class BindingImpl extends Binding implements AMQQueue.Task, Task
+    private final class BindingImpl extends Binding
     {
         private final BindingLogSubject _logSubject;
         //TODO : persist creation time
@@ -689,12 +695,6 @@ public abstract class AbstractExchange i
 
         }
 
-
-        public void doTask(final AMQQueue queue) throws AMQException
-        {
-            removeBinding(this);
-        }
-
         public void onClose(final Exchange exchange) throws AMQSecurityException, AMQInternalException
         {
             removeBinding(this);
@@ -729,11 +729,6 @@ public abstract class AbstractExchange i
 
     }
 
-    public static interface Task
-    {
-        public void onClose(Exchange exchange) throws AMQSecurityException, AMQInternalException;
-    }
-
 
 
 }

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java?rev=1565726&r1=1565725&r2=1565726&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java Fri Feb  7 16:57:49 2014
@@ -32,18 +32,22 @@ import org.apache.qpid.AMQInternalExcept
 import org.apache.qpid.AMQSecurityException;
 import org.apache.qpid.exchange.ExchangeDefaults;
 import org.apache.qpid.server.binding.Binding;
+import org.apache.qpid.server.consumer.Consumer;
 import org.apache.qpid.server.logging.LogSubject;
 import org.apache.qpid.server.logging.actors.CurrentActor;
 import org.apache.qpid.server.logging.messages.ExchangeMessages;
 import org.apache.qpid.server.message.InstanceProperties;
+import org.apache.qpid.server.message.MessageInstance;
 import org.apache.qpid.server.message.MessageReference;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.model.UUIDGenerator;
 import org.apache.qpid.server.plugin.ExchangeType;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.queue.BaseQueue;
+import org.apache.qpid.server.queue.QueueEntry;
 import org.apache.qpid.server.queue.QueueRegistry;
 import org.apache.qpid.server.txn.ServerTransaction;
+import org.apache.qpid.server.util.Action;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 
 public class DefaultExchange implements Exchange
@@ -334,7 +338,7 @@ public class DefaultExchange implements 
     public final int send(final ServerMessage message,
                           final InstanceProperties instanceProperties,
                           final ServerTransaction txn,
-                          final BaseQueue.PostEnqueueAction postEnqueueAction)
+                          final Action<MessageInstance<? extends Consumer>> postEnqueueAction)
     {
         final AMQQueue q = _virtualHost.getQueue(message.getRoutingKey());
         if(q == null)

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/Exchange.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/Exchange.java?rev=1565726&r1=1565725&r2=1565726&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/Exchange.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/Exchange.java Fri Feb  7 16:57:49 2014
@@ -24,20 +24,16 @@ import org.apache.qpid.AMQException;
 import org.apache.qpid.AMQInternalException;
 import org.apache.qpid.AMQSecurityException;
 import org.apache.qpid.server.binding.Binding;
-import org.apache.qpid.server.message.InstanceProperties;
-import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.message.MessageDestination;
 import org.apache.qpid.server.plugin.ExchangeType;
 import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.BaseQueue;
-import org.apache.qpid.server.txn.ServerTransaction;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 
 import java.util.Collection;
-import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 
-public interface Exchange extends ExchangeReferrer
+public interface Exchange extends ExchangeReferrer, MessageDestination
 {
     void initialise(UUID id, VirtualHost host, String name, boolean durable, boolean autoDelete)
             throws AMQException;
@@ -95,19 +91,6 @@ public interface Exchange extends Exchan
     void close() throws AMQException;
 
     /**
-     * Routes a message
-     * @param message the message to be routed
-     * @param instanceProperties the instance properties
-     * @param txn the transaction to enqueue within
-     * @param postEnqueueAction action to perform on the result of every enqueue (may be null)
-     * @return the number of queues in which the message was enqueued performed
-     */
-    int send(ServerMessage message,
-             InstanceProperties instanceProperties,
-             ServerTransaction txn,
-             BaseQueue.PostEnqueueAction postEnqueueAction);
-
-    /**
      * Determines whether a message would be isBound to a particular queue using a specific routing key and arguments
      * @param bindingKey
      * @param arguments

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/actors/CurrentActor.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/actors/CurrentActor.java?rev=1565726&r1=1565725&r2=1565726&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/actors/CurrentActor.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/actors/CurrentActor.java Fri Feb  7 16:57:49 2014
@@ -35,7 +35,6 @@ import org.apache.qpid.server.logging.Lo
  * 2) We can set new actors at the point we have enough information. i.e.
  * - Set a low level ConnectionActor when processing bytes from the wire.
  * - Set a ChannelActor when we are processing the frame
- * - Set a SubscriptionActor when we are handling the subscription.
  * <p/>
  * The code performing the logging need not worry about what type of actor is
  * currently set so can perform its logging. The resulting log entry though will

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/actors/GenericActor.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/actors/GenericActor.java?rev=1565726&r1=1565725&r2=1565726&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/actors/GenericActor.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/actors/GenericActor.java Fri Feb  7 16:57:49 2014
@@ -42,6 +42,24 @@ public class GenericActor extends Abstra
         _defaultMessageLogger = defaultMessageLogger;
     }
 
+    public GenericActor(final String logSubject)
+    {
+        this(new LogSubject()
+        {
+            @Override
+            public String toLogString()
+            {
+                return logSubject;
+            }
+        });
+    }
+
+
+    public GenericActor(LogSubject logSubject)
+    {
+        this(logSubject, CurrentActor.get().getRootMessageLogger());
+    }
+
     public GenericActor(LogSubject logSubject, RootMessageLogger rootLogger)
     {
         super(rootLogger);
@@ -53,6 +71,11 @@ public class GenericActor extends Abstra
         return _logSubject.toLogString();
     }
 
+    public LogSubject getLogSubject()
+    {
+        return _logSubject;
+    }
+
     public static LogActor getInstance(final String logMessage, RootMessageLogger rootLogger)
     {
         return new GenericActor(new LogSubject()

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Session.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Session.java?rev=1565726&r1=1565725&r2=1565726&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Session.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Session.java Fri Feb  7 16:57:49 2014
@@ -77,6 +77,6 @@ public interface Session extends Configu
                                                              CHANNEL_ID,
                                                              PRODUCER_FLOW_BLOCKED));
 
-    Collection<Consumer> getSubscriptions();
+    Collection<Consumer> getConsumers();
     Collection<Publisher> getPublishers();
 }

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java?rev=1565726&r1=1565725&r2=1565726&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java Fri Feb  7 16:57:49 2014
@@ -20,6 +20,7 @@
  */
 package org.apache.qpid.server.model;
 
+import org.apache.qpid.server.message.MessageInstance;
 import org.apache.qpid.server.queue.QueueEntry;
 import org.apache.qpid.server.security.SecurityManager;
 import org.apache.qpid.server.store.MessageStore;
@@ -144,11 +145,11 @@ public interface VirtualHost extends Con
 
     public static interface Transaction
     {
-        void dequeue(QueueEntry entry);
+        void dequeue(MessageInstance entry);
 
-        void copy(QueueEntry entry, Queue queue);
+        void copy(MessageInstance entry, Queue queue);
 
-        void move(QueueEntry entry, Queue queue);
+        void move(MessageInstance entry, Queue queue);
 
     }
 

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConsumerAdapter.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConsumerAdapter.java?rev=1565726&r1=1565725&r2=1565726&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConsumerAdapter.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConsumerAdapter.java Fri Feb  7 16:57:49 2014
@@ -22,33 +22,32 @@ package org.apache.qpid.server.model.ada
 
 import java.util.Map;
 import org.apache.qpid.server.model.ConfiguredObject;
-import org.apache.qpid.server.model.Consumer;
 import org.apache.qpid.server.model.LifetimePolicy;
 import org.apache.qpid.server.model.State;
 import org.apache.qpid.server.model.Statistics;
 import org.apache.qpid.server.model.UUIDGenerator;
-import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.consumer.Consumer;
 
 import java.security.AccessControlException;
 import java.util.Collection;
 import java.util.Collections;
 
-public class ConsumerAdapter extends AbstractAdapter implements Consumer
+public class ConsumerAdapter extends AbstractAdapter implements org.apache.qpid.server.model.Consumer
 {
-    private final Subscription _subscription;
+    private final Consumer _consumer;
     private final QueueAdapter _queue;
     private final SessionAdapter _session;
     private final ConsumerStatistics _statistics;
 
     public ConsumerAdapter(final QueueAdapter queueAdapter, final SessionAdapter sessionAdapter,
-                           final Subscription subscription)
+                           final Consumer consumer)
     {
         super(UUIDGenerator.generateConsumerUUID(queueAdapter.getVirtualHost().getName(),
                                                queueAdapter.getName(),
-                                               subscription.getSessionModel().getConnectionModel().getRemoteAddressString(),
-                                               String.valueOf(subscription.getSessionModel().getChannelId()),
-                                               subscription.getConsumerName()), queueAdapter.getTaskExecutor());
-        _subscription = subscription;
+                                               consumer.getSessionModel().getConnectionModel().getRemoteAddressString(),
+                                               String.valueOf(consumer.getSessionModel().getChannelId()),
+                                               consumer.getName()), queueAdapter.getTaskExecutor());
+        _consumer = consumer;
         _queue = queueAdapter;
         _session = sessionAdapter;
         _statistics = new ConsumerStatistics();
@@ -57,7 +56,7 @@ public class ConsumerAdapter extends Abs
 
     public String getName()
     {
-        return _subscription.getConsumerName();
+        return _consumer.getName();
     }
 
     public String setName(final String currentName, final String desiredName)
@@ -107,7 +106,7 @@ public class ConsumerAdapter extends Abs
     @Override
     public Collection<String> getAttributeNames()
     {
-        return Consumer.AVAILABLE_ATTRIBUTES;
+        return org.apache.qpid.server.model.Consumer.AVAILABLE_ATTRIBUTES;
     }
 
     @Override
@@ -147,7 +146,7 @@ public class ConsumerAdapter extends Abs
         }
         else if(DISTRIBUTION_MODE.equals(name))
         {
-            return _subscription.acquires() ? "MOVE" : "COPY";
+            return _consumer.acquires() ? "MOVE" : "COPY";
         }
         else if(SETTLEMENT_MODE.equals(name))
         {
@@ -197,11 +196,11 @@ public class ConsumerAdapter extends Abs
         {
             if(name.equals(BYTES_OUT))
             {
-                return _subscription.getBytesOut();
+                return _consumer.getBytesOut();
             }
             else if(name.equals(MESSAGES_OUT))
             {
-                return _subscription.getMessagesOut();
+                return _consumer.getMessagesOut();
             }
             else if(name.equals(STATE_CHANGED))
             {
@@ -209,11 +208,11 @@ public class ConsumerAdapter extends Abs
             }
             else if(name.equals(UNACKNOWLEDGED_BYTES))
             {
-                return _subscription.getUnacknowledgedBytes();
+                return _consumer.getUnacknowledgedBytes();
             }
             else if(name.equals(UNACKNOWLEDGED_MESSAGES))
             {
-                return _subscription.getUnacknowledgedMessages();
+                return _consumer.getUnacknowledgedMessages();
             }
             return null;  // TODO - Implement
         }

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java?rev=1565726&r1=1565725&r2=1565726&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java Fri Feb  7 16:57:49 2014
@@ -35,7 +35,6 @@ import org.apache.qpid.server.binding.Bi
 import org.apache.qpid.server.configuration.IllegalConfigurationException;
 import org.apache.qpid.server.model.ConfiguredObject;
 import org.apache.qpid.server.model.ConfiguredObjectFinder;
-import org.apache.qpid.server.model.Consumer;
 import org.apache.qpid.server.model.Exchange;
 import org.apache.qpid.server.model.IllegalStateTransitionException;
 import org.apache.qpid.server.model.LifetimePolicy;
@@ -47,10 +46,11 @@ import org.apache.qpid.server.protocol.A
 import org.apache.qpid.server.protocol.AMQSessionModel;
 import org.apache.qpid.server.queue.*;
 import org.apache.qpid.server.store.DurableConfigurationStoreHelper;
-import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.consumer.Consumer;
 import org.apache.qpid.server.util.MapValueConverter;
 
-final class QueueAdapter extends AbstractAdapter implements Queue, AMQQueue.SubscriptionRegistrationListener, AMQQueue.NotificationListener
+final class QueueAdapter extends AbstractAdapter implements Queue,
+                                                            AMQQueue.ConsumerRegistrationListener, AMQQueue.NotificationListener
 {
     @SuppressWarnings("serial")
     static final Map<String, Type> ATTRIBUTE_TYPES = Collections.unmodifiableMap(new HashMap<String, Type>(){{
@@ -69,8 +69,8 @@ final class QueueAdapter extends Abstrac
     private final AMQQueue _queue;
     private final Map<Binding, BindingAdapter> _bindingAdapters =
             new HashMap<Binding, BindingAdapter>();
-    private Map<org.apache.qpid.server.subscription.Subscription, ConsumerAdapter> _consumerAdapters =
-            new HashMap<org.apache.qpid.server.subscription.Subscription, ConsumerAdapter>();
+    private Map<Consumer, ConsumerAdapter> _consumerAdapters =
+            new HashMap<Consumer, ConsumerAdapter>();
 
 
     private final VirtualHostAdapter _vhost;
@@ -84,7 +84,7 @@ final class QueueAdapter extends Abstrac
         addParent(org.apache.qpid.server.model.VirtualHost.class, virtualHostAdapter);
 
         _queue = queue;
-        _queue.addSubscriptionRegistrationListener(this);
+        _queue.addConsumerRegistrationListener(this);
         populateConsumers();
         _statistics = new QueueStatisticsAdapter(queue);
         _queue.setNotificationListener(this);
@@ -124,21 +124,21 @@ final class QueueAdapter extends Abstrac
 
     private void populateConsumers()
     {
-        Collection<org.apache.qpid.server.subscription.Subscription> actualSubscriptions = _queue.getConsumers();
+        Collection<Consumer> actualConsumers = _queue.getConsumers();
 
         synchronized (_consumerAdapters)
         {
-            Iterator<org.apache.qpid.server.subscription.Subscription> iter = _consumerAdapters.keySet().iterator();
-            for(org.apache.qpid.server.subscription.Subscription subscription : actualSubscriptions)
+            Iterator<Consumer> iter = _consumerAdapters.keySet().iterator();
+            for(Consumer consumer : actualConsumers)
             {
-                if(!_consumerAdapters.containsKey(subscription))
+                if(!_consumerAdapters.containsKey(consumer))
                 {
-                    SessionAdapter sessionAdapter = getSessionAdapter(subscription.getSessionModel());
-                    ConsumerAdapter adapter = new ConsumerAdapter(this, sessionAdapter, subscription);
-                    _consumerAdapters.put(subscription, adapter);
+                    SessionAdapter sessionAdapter = getSessionAdapter(consumer.getSessionModel());
+                    ConsumerAdapter adapter = new ConsumerAdapter(this, sessionAdapter, consumer);
+                    _consumerAdapters.put(consumer, adapter);
                     if (sessionAdapter != null)
                     { // Register ConsumerAdapter with the SessionAdapter.
-                        sessionAdapter.subscriptionRegistered(subscription, adapter);
+                        sessionAdapter.consumerRegistered(consumer, adapter);
                     }
                 }
             }
@@ -153,11 +153,11 @@ final class QueueAdapter extends Abstrac
         }
     }
 
-    public Collection<Consumer> getConsumers()
+    public Collection<org.apache.qpid.server.model.Consumer> getConsumers()
     {
         synchronized (_consumerAdapters)
         {
-            return new ArrayList<Consumer>(_consumerAdapters.values());
+            return new ArrayList<org.apache.qpid.server.model.Consumer>(_consumerAdapters.values());
         }
 
     }
@@ -502,7 +502,7 @@ final class QueueAdapter extends Abstrac
     @Override
     public <C extends ConfiguredObject> Collection<C> getChildren(Class<C> clazz)
     {
-        if(clazz == Consumer.class)
+        if(clazz == org.apache.qpid.server.model.Consumer.class)
         {
             return (Collection<C>) getConsumers();
         }
@@ -587,19 +587,19 @@ final class QueueAdapter extends Abstrac
         return _queue;
     }
 
-    public void subscriptionRegistered(final AMQQueue queue, final Subscription subscription)
+    public void consumerAdded(final AMQQueue queue, final Consumer consumer)
     {
         ConsumerAdapter adapter = null;
         synchronized (_consumerAdapters)
         {
-            if(!_consumerAdapters.containsKey(subscription))
+            if(!_consumerAdapters.containsKey(consumer))
             {
-                SessionAdapter sessionAdapter = getSessionAdapter(subscription.getSessionModel());
-                adapter = new ConsumerAdapter(this, sessionAdapter, subscription);
-                _consumerAdapters.put(subscription, adapter);
+                SessionAdapter sessionAdapter = getSessionAdapter(consumer.getSessionModel());
+                adapter = new ConsumerAdapter(this, sessionAdapter, consumer);
+                _consumerAdapters.put(consumer, adapter);
                 if (sessionAdapter != null)
                 { // Register ConsumerAdapter with the SessionAdapter.
-                    sessionAdapter.subscriptionRegistered(subscription, adapter);
+                    sessionAdapter.consumerRegistered(consumer, adapter);
                 }
             }
         }
@@ -609,20 +609,20 @@ final class QueueAdapter extends Abstrac
         }
     }
 
-    public void subscriptionUnregistered(final AMQQueue queue, final Subscription subscription)
+    public void consumerRemoved(final AMQQueue queue, final Consumer consumer)
     {
         ConsumerAdapter adapter = null;
 
         synchronized (_consumerAdapters)
         {
-            adapter = _consumerAdapters.remove(subscription);
+            adapter = _consumerAdapters.remove(consumer);
         }
         if(adapter != null)
         {
-            SessionAdapter sessionAdapter = getSessionAdapter(subscription.getSessionModel());
+            SessionAdapter sessionAdapter = getSessionAdapter(consumer.getSessionModel());
             if (sessionAdapter != null)
             { // Unregister ConsumerAdapter with the SessionAdapter.
-                sessionAdapter.subscriptionUnregistered(subscription);
+                sessionAdapter.consumerUnregistered(consumer);
             }
             childRemoved(adapter);
         }

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java?rev=1565726&r1=1565725&r2=1565726&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java Fri Feb  7 16:57:49 2014
@@ -34,9 +34,8 @@ import org.apache.qpid.server.model.Publ
 import org.apache.qpid.server.model.Session;
 import org.apache.qpid.server.model.State;
 import org.apache.qpid.server.model.Statistics;
-import org.apache.qpid.server.model.Consumer;
 import org.apache.qpid.server.model.UUIDGenerator;
-import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.consumer.Consumer;
 import org.apache.qpid.server.configuration.updater.TaskExecutor;
 import org.apache.qpid.server.protocol.AMQSessionModel;
 
@@ -47,7 +46,7 @@ final class SessionAdapter extends Abstr
 
     private AMQSessionModel _session;
     private SessionStatistics _statistics;
-    private Map<Subscription, ConsumerAdapter> _consumerAdapters = new HashMap<Subscription, ConsumerAdapter>();
+    private Map<Consumer, ConsumerAdapter> _consumerAdapters = new HashMap<Consumer, ConsumerAdapter>();
 
     public SessionAdapter(final AMQSessionModel session, TaskExecutor taskExecutor)
     {
@@ -56,11 +55,11 @@ final class SessionAdapter extends Abstr
         _statistics = new SessionStatistics();
     }
 
-    public Collection<Consumer> getSubscriptions()
+    public Collection<org.apache.qpid.server.model.Consumer> getConsumers()
     {
         synchronized (_consumerAdapters)
         {
-            return new ArrayList<Consumer>(_consumerAdapters.values());
+            return new ArrayList<org.apache.qpid.server.model.Consumer>(_consumerAdapters.values());
         }
     }
 
@@ -119,29 +118,29 @@ final class SessionAdapter extends Abstr
     }
 
     /**
-     * Register a ConsumerAdapter (Subscription) with this Session keyed by the Subscription.
-     * @param subscription the org.apache.qpid.server.subscription.Subscription used to key the ConsumerAdapter.
+     * Register a ConsumerAdapter with this Session keyed by the Consumer.
+     * @param consumer the org.apache.qpid.server.consumer.Consumer used to key the ConsumerAdapter.
      * @param adapter the registered ConsumerAdapter.
      */
-    void subscriptionRegistered(Subscription subscription, ConsumerAdapter adapter)
+    void consumerRegistered(Consumer consumer, ConsumerAdapter adapter)
     {
         synchronized (_consumerAdapters)
         {
-            _consumerAdapters.put(subscription, adapter);
+            _consumerAdapters.put(consumer, adapter);
         }
         childAdded(adapter);
     }
 
     /**
-     * Unregister a ConsumerAdapter (Subscription) with this Session keyed by the Subscription.
-     * @param subscription the org.apache.qpid.server.subscription.Subscription used to key the ConsumerAdapter.
+     * Unregister a ConsumerAdapter  with this Session keyed by the Consumer.
+     * @param consumer the org.apache.qpid.server.consumer.Consumer used to key the ConsumerAdapter.
      */
-    void subscriptionUnregistered(Subscription subscription)
+    void consumerUnregistered(Consumer consumer)
     {
         ConsumerAdapter adapter = null;
         synchronized (_consumerAdapters)
         {
-            adapter = _consumerAdapters.remove(subscription);
+            adapter = _consumerAdapters.remove(consumer);
         }
         if (adapter != null)
         {
@@ -188,9 +187,9 @@ final class SessionAdapter extends Abstr
     @Override
     public <C extends ConfiguredObject> Collection<C> getChildren(Class<C> clazz)
     {
-        if(clazz == Consumer.class)
+        if(clazz == org.apache.qpid.server.model.Consumer.class)
         {
-            return (Collection<C>) getSubscriptions();
+            return (Collection<C>) getConsumers();
         }
         else if(clazz == Publisher.class)
         {

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java?rev=1565726&r1=1565725&r2=1565726&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java Fri Feb  7 16:57:49 2014
@@ -44,6 +44,7 @@ import org.apache.qpid.AMQException;
 import org.apache.qpid.server.configuration.IllegalConfigurationException;
 import org.apache.qpid.server.configuration.VirtualHostConfiguration;
 import org.apache.qpid.server.configuration.XmlConfigurationUtilities.MyConfiguration;
+import org.apache.qpid.server.message.MessageInstance;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.model.Broker;
 import org.apache.qpid.server.model.ConfiguredObject;
@@ -759,11 +760,11 @@ public final class VirtualHostAdapter ex
 
         op.withinTransaction(new Transaction()
         {
-            public void dequeue(final QueueEntry entry)
+            public void dequeue(final MessageInstance entry)
             {
                 if(entry.acquire())
                 {
-                    txn.dequeue(entry.getQueue(), entry.getMessage(), new ServerTransaction.Action()
+                    txn.dequeue(entry.getOwningResource(), entry.getMessage(), new ServerTransaction.Action()
                     {
                         public void postCommit()
                         {
@@ -777,7 +778,7 @@ public final class VirtualHostAdapter ex
                 }
             }
 
-            public void copy(QueueEntry entry, Queue queue)
+            public void copy(MessageInstance entry, Queue queue)
             {
                 final ServerMessage message = entry.getMessage();
                 final AMQQueue toQueue = ((QueueAdapter)queue).getAMQQueue();
@@ -788,7 +789,7 @@ public final class VirtualHostAdapter ex
                     {
                         try
                         {
-                            toQueue.enqueue(message);
+                            toQueue.enqueue(message, null);
                         }
                         catch(AMQException e)
                         {
@@ -803,7 +804,7 @@ public final class VirtualHostAdapter ex
 
             }
 
-            public void move(final QueueEntry entry, Queue queue)
+            public void move(final MessageInstance entry, Queue queue)
             {
                 final ServerMessage message = entry.getMessage();
                 final AMQQueue toQueue = ((QueueAdapter)queue).getAMQQueue();
@@ -817,7 +818,7 @@ public final class VirtualHostAdapter ex
                                     {
                                         try
                                         {
-                                            toQueue.enqueue(message);
+                                            toQueue.enqueue(message, null);
                                         }
                                         catch (AMQException e)
                                         {
@@ -830,7 +831,7 @@ public final class VirtualHostAdapter ex
                                         entry.release();
                                     }
                                 });
-                    txn.dequeue(entry.getQueue(), message,
+                    txn.dequeue(entry.getOwningResource(), message,
                                 new ServerTransaction.Action()
                                 {
 

Propchange: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/
------------------------------------------------------------------------------
  Merged /qpid/branches/java-broker-amqp-1-0-management/java/broker-core/src/main/java/org/apache/qpid/server/queue:r1562456-1565710

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java?rev=1565726&r1=1565725&r2=1565726&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java Fri Feb  7 16:57:49 2014
@@ -26,19 +26,20 @@ import org.apache.qpid.server.configurat
 import org.apache.qpid.server.exchange.Exchange;
 import org.apache.qpid.server.exchange.ExchangeReferrer;
 import org.apache.qpid.server.logging.LogSubject;
-import org.apache.qpid.server.protocol.AMQSessionModel;
-import org.apache.qpid.server.security.AuthorizationHolder;
-import org.apache.qpid.server.store.TransactionLogResource;
-import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.message.MessageDestination;
+import org.apache.qpid.server.message.MessageSource;
+import org.apache.qpid.server.protocol.CapacityChecker;
+import org.apache.qpid.server.consumer.Consumer;
+import org.apache.qpid.server.util.Action;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 
 import java.util.Collection;
 import java.util.List;
 import java.util.Set;
 
-public interface AMQQueue extends Comparable<AMQQueue>, ExchangeReferrer, TransactionLogResource, BaseQueue
+public interface AMQQueue<C extends Consumer> extends Comparable<AMQQueue<C>>, ExchangeReferrer, BaseQueue,
+                                                      MessageSource<C>, CapacityChecker, MessageDestination
 {
-    String getName();
 
     public interface NotificationListener
     {
@@ -65,45 +66,20 @@ public interface AMQQueue extends Compar
 
     long getTotalEnqueueCount();
 
-    public interface Context
-    {
-        QueueEntry getLastSeenEntry();
-    }
-
     void setNoLocal(boolean b);
 
     boolean isAutoDelete();
 
     String getOwner();
-    AuthorizationHolder getAuthorizationHolder();
-    void setAuthorizationHolder(AuthorizationHolder principalHolder);
-
-    void setExclusiveOwningSession(AMQSessionModel owner);
-    AMQSessionModel getExclusiveOwningSession();
 
     VirtualHost getVirtualHost();
 
-    void registerSubscription(final Subscription subscription, final boolean exclusive) throws AMQException;
-
-    void unregisterSubscription(final Subscription subscription) throws AMQException;
-
-    Collection<Subscription> getConsumers();
-
-    interface SubscriptionRegistrationListener
-    {
-        void subscriptionRegistered(AMQQueue queue, Subscription subscription);
-        void subscriptionUnregistered(AMQQueue queue, Subscription subscription);
-    }
-
-    void addSubscriptionRegistrationListener(SubscriptionRegistrationListener listener);
-    void removeSubscriptionRegistrationListener(SubscriptionRegistrationListener listener);
-
 
     int getConsumerCount();
 
     int getActiveConsumerCount();
 
-    boolean hasExclusiveSubscriber();
+    boolean hasExclusiveConsumer();
 
     boolean isUnused();
 
@@ -126,14 +102,14 @@ public interface AMQQueue extends Compar
 
     void requeue(QueueEntry entry);
 
-    void dequeue(QueueEntry entry, Subscription sub);
+    void dequeue(QueueEntry entry, Consumer sub);
 
     void decrementUnackedMsgCount(QueueEntry queueEntry);
 
-    boolean resend(final QueueEntry entry, final Subscription subscription) throws AMQException;
+    boolean resend(final QueueEntry entry, final Consumer consumer) throws AMQException;
 
-    void addQueueDeleteTask(final Task task);
-    void removeQueueDeleteTask(final Task task);
+    void addQueueDeleteTask(Action<AMQQueue> task);
+    void removeQueueDeleteTask(Action<AMQQueue> task);
 
 
 
@@ -209,16 +185,10 @@ public interface AMQQueue extends Compar
 
     Set<NotificationCheck> getNotificationChecks();
 
-    void flushSubscription(final Subscription sub) throws AMQException;
-
-    void deliverAsync(final Subscription sub);
-
     void deliverAsync();
 
     void stop();
 
-    boolean isExclusive();
-
     Exchange getAlternateExchange();
 
     void setAlternateExchange(Exchange exchange);
@@ -226,56 +196,6 @@ public interface AMQQueue extends Compar
     Collection<String> getAvailableAttributes();
     Object getAttribute(String attrName);
 
-    void checkCapacity(AMQSessionModel channel);
-
-    /**
-     * ExistingExclusiveSubscription signals a failure to create a subscription, because an exclusive subscription
-     * already exists.
-     *
-     * <p/><table id="crc"><caption>CRC Card</caption>
-     * <tr><th> Responsibilities <th> Collaborations
-     * <tr><td> Represent failure to create a subscription, because an exclusive subscription already exists.
-     * </table>
-     *
-     * @todo Not an AMQP exception as no status code.
-     *
-     * @todo Move to top level, used outside this class.
-     */
-    static final class ExistingExclusiveSubscription extends AMQException
-    {
-
-        public ExistingExclusiveSubscription()
-        {
-            super("");
-        }
-    }
-
-    /**
-     * ExistingSubscriptionPreventsExclusive signals a failure to create an exclusive subscription, as a subscription
-     * already exists.
-     *
-     * <p/><table id="crc"><caption>CRC Card</caption>
-     * <tr><th> Responsibilities <th> Collaborations
-     * <tr><td> Represent failure to create an exclusive subscription, as a subscription already exists.
-     * </table>
-     *
-     * @todo Not an AMQP exception as no status code.
-     *
-     * @todo Move to top level, used outside this class.
-     */
-    static final class ExistingSubscriptionPreventsExclusive extends AMQException
-    {
-        public ExistingSubscriptionPreventsExclusive()
-        {
-            super("");
-        }
-    }
-
-    static interface Task
-    {
-        public void doTask(AMQQueue queue) throws AMQException;
-    }
-
     void configure(QueueConfiguration config);
 
     void setExclusive(boolean exclusive);

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/BaseQueue.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/BaseQueue.java?rev=1565726&r1=1565725&r2=1565726&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/BaseQueue.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/BaseQueue.java Fri Feb  7 16:57:49 2014
@@ -22,19 +22,15 @@
 package org.apache.qpid.server.queue;
 
 import org.apache.qpid.AMQException;
+import org.apache.qpid.server.consumer.Consumer;
+import org.apache.qpid.server.message.MessageInstance;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.store.TransactionLogResource;
+import org.apache.qpid.server.util.Action;
 
 public interface BaseQueue extends TransactionLogResource
 {
-    public static interface PostEnqueueAction
-    {
-        public void onEnqueue(QueueEntry entry);
-    }
-
-    void enqueue(ServerMessage message) throws AMQException;
-    void enqueue(ServerMessage message, PostEnqueueAction action) throws AMQException;
-    void enqueue(ServerMessage message, boolean transactional, PostEnqueueAction action) throws AMQException;
+    void enqueue(ServerMessage message, Action<MessageInstance<? extends Consumer>> action) throws AMQException;
 
     boolean isDurable();
     boolean isDeleted();

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java?rev=1565726&r1=1565725&r2=1565726&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java Fri Feb  7 16:57:49 2014
@@ -43,7 +43,7 @@ public class ConflationQueueList extends
     private final QueueEntry _deleteInProgress = new SimpleQueueEntryImpl(this);
     private final QueueEntry _newerEntryAlreadyBeenAndGone = new SimpleQueueEntryImpl(this);
 
-    public ConflationQueueList(AMQQueue queue, String conflationKey)
+    public ConflationQueueList(AMQQueue<QueueConsumer> queue, String conflationKey)
     {
         super(queue);
         _conflationKey = conflationKey;

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/OutOfOrderQueue.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/OutOfOrderQueue.java?rev=1565726&r1=1565725&r2=1565726&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/OutOfOrderQueue.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/OutOfOrderQueue.java Fri Feb  7 16:57:49 2014
@@ -20,8 +20,6 @@
  */
 package org.apache.qpid.server.queue;
 
-import org.apache.qpid.server.subscription.Subscription;
-import org.apache.qpid.server.subscription.SubscriptionList;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 
 import java.util.Map;
@@ -38,16 +36,16 @@ public abstract class OutOfOrderQueue ex
     }
 
     @Override
-    protected void checkSubscriptionsNotAheadOfDelivery(final QueueEntry entry)
+    protected void checkConsumersNotAheadOfDelivery(final QueueEntry entry)
     {
-        // check that all subscriptions are not in advance of the entry
-        SubscriptionList.SubscriptionNodeIterator subIter = getSubscriptionList().iterator();
+        // check that all consumers are not in advance of the entry
+        QueueConsumerList.ConsumerNodeIterator subIter = getConsumerList().iterator();
         while(subIter.advance() && !entry.isAcquired())
         {
-            final Subscription subscription = subIter.getNode().getSubscription();
-            if(!subscription.isClosed())
+            final QueueConsumer consumer = subIter.getNode().getConsumer();
+            if(!consumer.isClosed())
             {
-                QueueContext context = (QueueContext) subscription.getQueueContext();
+                QueueContext context = consumer.getQueueContext();
                 if(context != null)
                 {
                     QueueEntry released = context.getReleasedEntry();

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java?rev=1565726&r1=1565725&r2=1565726&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java Fri Feb  7 16:57:49 2014
@@ -24,7 +24,7 @@ import org.apache.qpid.server.message.Se
 
 public class PriorityQueueList implements QueueEntryList<SimpleQueueEntryImpl>
 {
-    private final AMQQueue _queue;
+    private final AMQQueue<QueueConsumer> _queue;
     private final PriorityQueueEntrySubList[] _priorityLists;
     private final int _priorities;
     private final int _priorityOffset;
@@ -46,7 +46,7 @@ public class PriorityQueueList implement
         return _priorities;
     }
 
-    public AMQQueue getQueue()
+    public AMQQueue<QueueConsumer> getQueue()
     {
         return _queue;
     }
@@ -166,7 +166,7 @@ public class PriorityQueueList implement
     {
         private int _listPriority;
 
-        public PriorityQueueEntrySubList(AMQQueue queue, int listPriority)
+        public PriorityQueueEntrySubList(AMQQueue<QueueConsumer> queue, int listPriority)
         {
             super(queue);
             _listPriority = listPriority;

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueContext.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueContext.java?rev=1565726&r1=1565725&r2=1565726&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueContext.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueContext.java Fri Feb  7 16:57:49 2014
@@ -23,7 +23,7 @@ package org.apache.qpid.server.queue;
 
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 
-final class QueueContext implements AMQQueue.Context
+final class QueueContext
 {
     private volatile QueueEntry _lastSeenEntry;
     private volatile QueueEntry _releasedEntry;

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java?rev=1565726&r1=1565725&r2=1565726&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java Fri Feb  7 16:57:49 2014
@@ -20,207 +20,20 @@
 */
 package org.apache.qpid.server.queue;
 
-import org.apache.qpid.AMQException;
-import org.apache.qpid.server.filter.Filterable;
 import org.apache.qpid.server.message.MessageInstance;
-import org.apache.qpid.server.subscription.Subscription;
-import org.apache.qpid.server.txn.ServerTransaction;
 
-public interface QueueEntry extends MessageInstance, Comparable<QueueEntry>
+public interface QueueEntry extends MessageInstance<QueueConsumer>, Comparable<QueueEntry>
 {
 
-
-
-    public static enum State
-    {
-        AVAILABLE,
-        ACQUIRED,
-        EXPIRED,
-        DEQUEUED,
-        DELETED;
-
-
-    }
-
-    public static interface StateChangeListener
-    {
-        public void stateChanged(QueueEntry entry, State oldSate, State newState);
-    }
-
-    public abstract class EntryState
-    {
-        private EntryState()
-        {
-        }
-
-        public abstract State getState();
-
-        /**
-         * Returns true if state is either DEQUEUED or DELETED.
-         *
-         * @return true if state is either DEQUEUED or DELETED.
-         */
-        public boolean isDispensed()
-        {
-            State currentState = getState();
-            return currentState == State.DEQUEUED || currentState == State.DELETED;
-        }
-    }
-
-
-    public final class AvailableState extends EntryState
-    {
-
-        public State getState()
-        {
-            return State.AVAILABLE;
-        }
-
-        public String toString()
-        {
-            return getState().name();
-        }
-    }
-
-
-    public final class DequeuedState extends EntryState
-    {
-
-        public State getState()
-        {
-            return State.DEQUEUED;
-        }
-
-        public String toString()
-        {
-            return getState().name();
-        }
-    }
-
-
-    public final class DeletedState extends EntryState
-    {
-
-        public State getState()
-        {
-            return State.DELETED;
-        }
-
-        public String toString()
-        {
-            return getState().name();
-        }
-    }
-
-    public final class ExpiredState extends EntryState
-    {
-
-        public State getState()
-        {
-            return State.EXPIRED;
-        }
-
-        public String toString()
-        {
-            return getState().name();
-        }
-    }
-
-
-    public final class NonSubscriptionAcquiredState extends EntryState
-    {
-        public State getState()
-        {
-            return State.ACQUIRED;
-        }
-
-        public String toString()
-        {
-            return getState().name();
-        }
-    }
-
-    public final class SubscriptionAcquiredState extends EntryState
-    {
-        private final Subscription _subscription;
-
-        public SubscriptionAcquiredState(Subscription subscription)
-        {
-            _subscription = subscription;
-        }
-
-
-        public State getState()
-        {
-            return State.ACQUIRED;
-        }
-
-        public Subscription getSubscription()
-        {
-            return _subscription;
-        }
-
-        public String toString()
-        {
-            return "{" + getState().name() + " : " + _subscription +"}";
-        }
-    }
-
-
-    final static EntryState AVAILABLE_STATE = new AvailableState();
-    final static EntryState DELETED_STATE = new DeletedState();
-    final static EntryState DEQUEUED_STATE = new DequeuedState();
-    final static EntryState NON_SUBSCRIPTION_ACQUIRED_STATE = new NonSubscriptionAcquiredState();
-
-
-
-
-    AMQQueue getQueue();
+    AMQQueue<QueueConsumer> getQueue();
 
     long getSize();
 
-    boolean getDeliveredToConsumer();
-
-    boolean expired() throws AMQException;
-
-    boolean acquire(Subscription sub);
-
-    boolean acquiredBySubscription();
-    boolean isAcquiredBy(Subscription subscription);
-
-    void setRedelivered();
-
-    boolean isRedelivered();
-
-    Subscription getDeliveredSubscription();
-
-    void reject();
-
-    boolean isRejectedBy(long subscriptionId);
-
-    int routeToAlternate(final BaseQueue.PostEnqueueAction action, ServerTransaction txn);
-
     boolean isQueueDeleted();
 
     QueueEntry getNextNode();
 
     QueueEntry getNextValidEntry();
 
-    void addStateChangeListener(StateChangeListener listener);
-    boolean removeStateChangeListener(StateChangeListener listener);
-
-
-    /**
-     * Number of times this queue entry has been delivered.
-     *
-     * @return delivery count
-     */
-    int getDeliveryCount();
-
-    void incrementDeliveryCount();
-
-    void decrementDeliveryCount();
-
-    Filterable asFilterable();
 
 }



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org