You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2014/02/27 00:27:43 UTC
svn commit: r1572343 [1/7] - in /qpid/trunk/qpid/java:
broker-core/src/main/java/org/apache/qpid/server/binding/
broker-core/src/main/java/org/apache/qpid/server/consumer/
broker-core/src/main/java/org/apache/qpid/server/exchange/
broker-core/src/main/...
Author: rgodfrey
Date: Wed Feb 26 23:27:39 2014
New Revision: 1572343
URL: http://svn.apache.org/r1572343
Log:
QPID-5577 : [Java Broker] Change Exchange,Queue,Binding,Consumer to implement ConfiguredObject and remove adapter classes
Added:
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/binding/BindingImpl.java
- copied, changed from r1571510, qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/binding/Binding.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/ExchangeImpl.java
- copied, changed from r1571506, qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/Exchange.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/NonDefaultExchange.java
Removed:
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/binding/Binding.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/Exchange.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Statistics.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BindingAdapter.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConsumerAdapter.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ExchangeAdapter.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/QueueAdapter.java
Modified:
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/Consumer.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/ExchangeFactory.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/topic/TopicExchangeResult.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterSupport.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/subjects/BindingLogSubject.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/logging/subjects/ExchangeLogSubject.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageDestination.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageSource.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AccessControlProvider.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AuthenticationProvider.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Binding.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObject.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Connection.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Consumer.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Exchange.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Group.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/GroupMember.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/GroupProvider.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/KeyStore.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Plugin.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Port.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/PreferencesProvider.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Session.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/TrustStore.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/User.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHostAlias.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/AbstractKeyStoreAdapter.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/AbstractPluginAdapter.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/AccessControlProviderAdapter.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/AuthenticationProviderAdapter.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileSystemPreferencesProvider.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/GroupProviderAdapter.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/PortAdapter.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAdapter.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/VirtualHostAliasAdapter.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/ExchangeType.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AssignedConsumerMessageGroupManager.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/BaseQueue.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/ConflationQueue.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/DefinedGroupMessageGroupManager.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/MessageGroupManager.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java (contents, props changed)
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/OrderedQueueEntry.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/OrderedQueueEntryList.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/OutOfOrderQueue.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueue.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerList.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueContext.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryIterator.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryListBase.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryListFactory.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryVisitor.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueue.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntry.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryListFactory.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueue.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueueEntry.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueueEntryList.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/SecurityManager.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/access/ObjectProperties.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationStoreHelper.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/BindingRecoverer.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/ExchangeExistsException.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/ExchangeRecoverer.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/QueueRecoverer.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostListener.java
qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/VirtualHostConfigurationTest.java
qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/DefaultExchangeFactoryTest.java
qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/FanoutExchangeTest.java
qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java
qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java
qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java
qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/logging/messages/ExchangeMessagesTest.java
qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/logging/subjects/AbstractTestLogSubject.java
qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/logging/subjects/BindingLogSubjectTest.java
qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/logging/subjects/ExchangeLogSubjectTest.java
qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java
qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java
qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/ConflationQueueListTest.java
qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/ConsumerListTest.java
qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockMessageInstance.java
qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/NotificationCheckTest.java
qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueListTest.java
qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueTest.java
qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryImplTestBase.java
qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryListTestBase.java
qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SelfValidatingSortedQueueEntryList.java
qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleQueueEntryImplTest.java
qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryListTest.java
qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryTest.java
qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueEntryListTest.java
qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java
qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/AbstractDurableConfigurationStoreTestCase.java
qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java
qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java
qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java
qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/StandardVirtualHostTest.java
qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java
qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeBoundHandler.java
qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeDeclareHandler.java
qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeDeleteHandler.java
qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueBindHandler.java
qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeclareHandler.java
qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueUnbindHandler.java
qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQChannelTest.java
qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/BrokerTestHelper_0_8.java
qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeueTest.java
qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java
qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java
qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
qpid/trunk/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java
qpid/trunk/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java
qpid/trunk/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java
qpid/trunk/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java
qpid/trunk/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/MessageServlet.java
qpid/trunk/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/ExchangeMBean.java
qpid/trunk/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/VirtualHostManagerMBean.java
qpid/trunk/qpid/java/broker-plugins/management-jmx/src/test/java/org/apache/qpid/server/jmx/mbeans/QueueMBeanTest.java
qpid/trunk/qpid/java/broker-plugins/management-jmx/src/test/java/org/apache/qpid/server/jmx/mbeans/VirtualHostManagerMBeanTest.java
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/store/MessageStoreTest.java
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/Asserts.java
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/AuthenticationProviderRestTest.java
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/BrokerRestHttpsTest.java
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/BrokerRestTest.java
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/ConnectionRestTest.java
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/GroupProviderRestTest.java
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/PreferencesProviderRestTest.java
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/QueueRestTest.java
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java
Copied: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/binding/BindingImpl.java (from r1571510, qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/binding/Binding.java)
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/binding/BindingImpl.java?p2=qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/binding/BindingImpl.java&p1=qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/binding/Binding.java&r1=1571510&r2=1572343&rev=1572343&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/binding/Binding.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/binding/BindingImpl.java Wed Feb 26 23:27:39 2014
@@ -20,11 +20,14 @@
*/
package org.apache.qpid.server.binding;
-import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.exchange.ExchangeImpl;
+import org.apache.qpid.server.exchange.NonDefaultExchange;
import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.logging.messages.BindingMessages;
import org.apache.qpid.server.logging.subjects.BindingLogSubject;
+import org.apache.qpid.server.model.Binding;
import org.apache.qpid.server.model.ConfiguredObject;
+import org.apache.qpid.server.model.Exchange;
import org.apache.qpid.server.model.LifetimePolicy;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.model.State;
@@ -42,26 +45,28 @@ import java.util.concurrent.CopyOnWriteA
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
-public class Binding
+public class BindingImpl
+ extends AbstractConfiguredObject<BindingImpl>
+ implements org.apache.qpid.server.model.Binding<BindingImpl>
{
private final String _bindingKey;
private final AMQQueue _queue;
- private final Exchange _exchange;
+ private final NonDefaultExchange _exchange;
private final Map<String, Object> _arguments;
private final UUID _id;
private final AtomicLong _matches = new AtomicLong();
private final BindingLogSubject _logSubject;
final AtomicBoolean _deleted = new AtomicBoolean();
- final CopyOnWriteArrayList<StateChangeListener<Binding,State>> _stateChangeListeners =
- new CopyOnWriteArrayList<StateChangeListener<Binding, State>>();
+ final CopyOnWriteArrayList<StateChangeListener<BindingImpl,State>> _stateChangeListeners =
+ new CopyOnWriteArrayList<StateChangeListener<BindingImpl, State>>();
- public Binding(UUID id,
- final String bindingKey,
- final AMQQueue queue,
- final Exchange exchange,
- final Map<String, Object> arguments)
+ public BindingImpl(UUID id,
+ final String bindingKey,
+ final AMQQueue queue,
+ final NonDefaultExchange exchange,
+ final Map<String, Object> arguments)
{
this(id, convertToAttributes(bindingKey, arguments), queue, exchange);
}
@@ -77,8 +82,9 @@ public class Binding
return attributes;
}
- public Binding(UUID id, Map<String,Object> attributes, AMQQueue queue, Exchange exchange)
+ public BindingImpl(UUID id, Map<String, Object> attributes, AMQQueue queue, NonDefaultExchange exchange)
{
+ super(id,Collections.EMPTY_MAP,attributes,queue.getVirtualHost().getTaskExecutor());
_id = id;
_bindingKey = (String)attributes.get(org.apache.qpid.server.model.Binding.NAME);
_queue = queue;
@@ -96,10 +102,6 @@ public class Binding
}
- public UUID getId()
- {
- return _id;
- }
public String getBindingKey()
{
@@ -111,11 +113,17 @@ public class Binding
return _queue;
}
- public Exchange getExchangeImpl()
+ @Override
+ public Queue getQueue()
{
- return _exchange;
+ return _queue;
}
+ @Override
+ public NonDefaultExchange getExchange()
+ {
+ return _exchange;
+ }
public Map<String, Object> getArguments()
{
@@ -137,10 +145,34 @@ public class Binding
return _queue.isDurable() && _exchange.isDurable();
}
+ @Override
+ public void setDurable(final boolean durable)
+ throws IllegalStateException, AccessControlException, IllegalArgumentException
+ {
+ if(durable != isDurable())
+ {
+ throw new IllegalArgumentException("Cannot change the durability of a binding");
+ }
+ }
+
public LifetimePolicy getLifetimePolicy()
{
- return LifetimePolicy.IN_USE;
+ return LifetimePolicy.PERMANENT;
+ }
+
+ @Override
+ public LifetimePolicy setLifetimePolicy(final LifetimePolicy expected, final LifetimePolicy desired)
+ throws IllegalStateException, AccessControlException, IllegalArgumentException
+ {
+ // TODO
+ return null;
+ }
+
+ @Override
+ public <C extends ConfiguredObject> Collection<C> getChildren(final Class<C> clazz)
+ {
+ return Collections.emptySet();
}
@Override
@@ -151,15 +183,15 @@ public class Binding
return true;
}
- if (!(o instanceof Binding))
+ if (!(o instanceof BindingImpl))
{
return false;
}
- final Binding binding = (Binding) o;
+ final BindingImpl binding = (BindingImpl) o;
return (_bindingKey == null ? binding.getBindingKey() == null : _bindingKey.equals(binding.getBindingKey()))
- && (_exchange == null ? binding.getExchangeImpl() == null : _exchange.equals(binding.getExchangeImpl()))
+ && (_exchange == null ? binding.getExchange() == null : _exchange.equals(binding.getExchange()))
&& (_queue == null ? binding.getAMQQueue() == null : _queue.equals(binding.getAMQQueue()));
}
@@ -194,7 +226,7 @@ public class Binding
{
if(_deleted.compareAndSet(false,true))
{
- for(StateChangeListener<Binding,State> listener : _stateChangeListeners)
+ for(StateChangeListener<BindingImpl,State> listener : _stateChangeListeners)
{
listener.stateChanged(this, State.ACTIVE, State.DELETED);
}
@@ -207,18 +239,90 @@ public class Binding
return _bindingKey;
}
+ @Override
+ public String setName(final String currentName, final String desiredName)
+ throws IllegalStateException, AccessControlException
+ {
+ // TODO
+ return null;
+ }
+
public State getState()
{
return _deleted.get() ? State.DELETED : State.ACTIVE;
}
- public void addStateChangeListener(StateChangeListener<Binding,State> listener)
+ public void addStateChangeListener(StateChangeListener<BindingImpl,State> listener)
{
_stateChangeListeners.add(listener);
}
- public void removeStateChangeListener(StateChangeListener<Binding,State> listener)
+ public void removeStateChangeListener(StateChangeListener<BindingImpl,State> listener)
{
_stateChangeListeners.remove(listener);
}
+
+ @Override
+ public Object getAttribute(final String name)
+ {
+ if(ID.equals(name))
+ {
+ return getId();
+ }
+ else if(NAME.equals(name))
+ {
+ return _bindingKey;
+ }
+ else if(DURABLE.equals(name))
+ {
+ return isDurable();
+ }
+ else if(LIFETIME_POLICY.equals(name))
+ {
+ return getLifetimePolicy();
+ }
+ else if(QUEUE.equals(name))
+ {
+ return _queue;
+ }
+ else if(EXCHANGE.equals(name))
+ {
+ return _exchange;
+ }
+ return super.getAttribute(name);
+ }
+
+ @Override
+ public Collection<String> getAttributeNames()
+ {
+ return getAttributeNames(Binding.class);
+ }
+
+ @Override
+ public <T extends ConfiguredObject> T getParent(final Class<T> clazz)
+ {
+ if(clazz == Exchange.class)
+ {
+ return (T) getExchange();
+ }
+ else if(clazz == Queue.class)
+ {
+ return (T) getQueue();
+ }
+ return super.getParent(clazz);
+ }
+
+ @Override
+ public Object setAttribute(final String name, final Object expected, final Object desired) throws IllegalStateException,
+ AccessControlException, IllegalArgumentException
+ {
+ throw new UnsupportedOperationException("Changing attributes on binding is not supported.");
+ }
+
+ @Override
+ public void setAttributes(final Map<String, Object> attributes) throws IllegalStateException, AccessControlException,
+ IllegalArgumentException
+ {
+ throw new UnsupportedOperationException("Changing attributes on binding is not supported.");
+ }
}
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/Consumer.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/Consumer.java?rev=1572343&r1=1572342&r2=1572343&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/Consumer.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/Consumer.java Wed Feb 26 23:27:39 2014
@@ -27,7 +27,7 @@ import org.apache.qpid.server.protocol.A
public interface Consumer
{
- AtomicLong SUB_ID_GENERATOR = new AtomicLong(0);
+ AtomicLong CONSUMER_NUMBER_GENERATOR = new AtomicLong(0);
void externalStateChange();
@@ -52,7 +52,7 @@ public interface Consumer
MessageSource getMessageSource();
- long getId();
+ long getConsumerNumber();
boolean isSuspended();
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java?rev=1572343&r1=1572342&r2=1572343&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java Wed Feb 26 23:27:39 2014
@@ -20,9 +20,10 @@
*/
package org.apache.qpid.server.exchange;
+import java.security.AccessControlException;
import java.util.ArrayList;
import org.apache.log4j.Logger;
-import org.apache.qpid.server.binding.Binding;
+import org.apache.qpid.server.binding.BindingImpl;
import org.apache.qpid.server.consumer.Consumer;
import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.logging.actors.CurrentActor;
@@ -32,9 +33,14 @@ import org.apache.qpid.server.message.In
import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.model.Binding;
+import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.model.LifetimePolicy;
+import org.apache.qpid.server.model.Publisher;
+import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.model.State;
import org.apache.qpid.server.model.UUIDGenerator;
+import org.apache.qpid.server.model.adapter.AbstractConfiguredObject;
import org.apache.qpid.server.plugin.ExchangeType;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.BaseQueue;
@@ -44,6 +50,8 @@ import org.apache.qpid.server.txn.Server
import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.util.MapValueConverter;
import org.apache.qpid.server.util.StateChangeListener;
+import org.apache.qpid.server.virtualhost.ExchangeIsAlternateException;
+import org.apache.qpid.server.virtualhost.RequiredExchangeException;
import org.apache.qpid.server.virtualhost.UnknownExchangeException;
import org.apache.qpid.server.virtualhost.VirtualHost;
@@ -58,19 +66,22 @@ import java.util.concurrent.atomic.Atomi
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
-public abstract class AbstractExchange<T extends Exchange> implements Exchange<T>
+public abstract class AbstractExchange<T extends AbstractExchange<T>>
+ extends AbstractConfiguredObject<T>
+ implements NonDefaultExchange<T>
{
private static final Logger _logger = Logger.getLogger(AbstractExchange.class);
+ private final LifetimePolicy _lifetimePolicy;
private String _name;
private final AtomicBoolean _closed = new AtomicBoolean();
- private Exchange _alternateExchange;
+ private NonDefaultExchange _alternateExchange;
private boolean _durable;
private VirtualHost _virtualHost;
- private final List<Action<Exchange>> _closeTaskList = new CopyOnWriteArrayList<Action<Exchange>>();
+ private final List<Action<ExchangeImpl>> _closeTaskList = new CopyOnWriteArrayList<Action<ExchangeImpl>>();
/**
* Whether the exchange is automatically deleted once all queues have detached from it
@@ -81,8 +92,7 @@ public abstract class AbstractExchange<T
private LogSubject _logSubject;
private Map<ExchangeReferrer,Object> _referrers = new ConcurrentHashMap<ExchangeReferrer,Object>();
- private final CopyOnWriteArrayList<Binding> _bindings = new CopyOnWriteArrayList<Binding>();
- private final UUID _id;
+ private final CopyOnWriteArrayList<BindingImpl> _bindings = new CopyOnWriteArrayList<BindingImpl>();
private final AtomicInteger _bindingCountHigh = new AtomicInteger();
private final AtomicLong _receivedMessageCount = new AtomicLong();
private final AtomicLong _receivedMessageSize = new AtomicLong();
@@ -91,23 +101,25 @@ public abstract class AbstractExchange<T
private final AtomicLong _droppedMessageCount = new AtomicLong();
private final AtomicLong _droppedMessageSize = new AtomicLong();
- private final CopyOnWriteArrayList<Exchange.BindingListener> _listeners = new CopyOnWriteArrayList<Exchange.BindingListener>();
+ private final CopyOnWriteArrayList<ExchangeImpl.BindingListener> _listeners = new CopyOnWriteArrayList<ExchangeImpl.BindingListener>();
- private final ConcurrentHashMap<BindingIdentifier, Binding> _bindingsMap = new ConcurrentHashMap<BindingIdentifier, Binding>();
+ private final ConcurrentHashMap<BindingIdentifier, BindingImpl> _bindingsMap = new ConcurrentHashMap<BindingIdentifier, BindingImpl>();
-
- //TODO : persist creation time
- private long _createTime = System.currentTimeMillis();
- private StateChangeListener<Binding, State> _bindingListener;
+ private StateChangeListener<BindingImpl, State> _bindingListener;
public AbstractExchange(VirtualHost vhost, Map<String, Object> attributes) throws UnknownExchangeException
{
+ super(MapValueConverter.getUUIDAttribute(org.apache.qpid.server.model.Exchange.ID, attributes),
+ Collections.<String,Object>emptyMap(), attributes, vhost.getTaskExecutor());
_virtualHost = vhost;
- _id = MapValueConverter.getUUIDAttribute(org.apache.qpid.server.model.Exchange.ID, attributes);
_name = MapValueConverter.getStringAttribute(org.apache.qpid.server.model.Exchange.NAME, attributes);
_durable = MapValueConverter.getBooleanAttribute(org.apache.qpid.server.model.Exchange.DURABLE, attributes);
- _autoDelete = MapValueConverter.getEnumAttribute(LifetimePolicy.class, org.apache.qpid.server.model.Exchange.LIFETIME_POLICY, attributes, LifetimePolicy.PERMANENT) != LifetimePolicy.PERMANENT;
+ _lifetimePolicy = MapValueConverter.getEnumAttribute(LifetimePolicy.class,
+ org.apache.qpid.server.model.Exchange.LIFETIME_POLICY,
+ attributes,
+ LifetimePolicy.PERMANENT);
+ _autoDelete = _lifetimePolicy != LifetimePolicy.PERMANENT;
_logSubject = new ExchangeLogSubject(this, this.getVirtualHost());
// check ACL
@@ -116,9 +128,9 @@ public abstract class AbstractExchange<T
Object alternateExchangeAttr = attributes.get(org.apache.qpid.server.model.Exchange.ALTERNATE_EXCHANGE);
if(alternateExchangeAttr != null)
{
- if(alternateExchangeAttr instanceof Exchange)
+ if(alternateExchangeAttr instanceof ExchangeImpl)
{
- setAlternateExchange((Exchange) alternateExchangeAttr);
+ setAlternateExchange((ExchangeImpl) alternateExchangeAttr);
}
else if(alternateExchangeAttr instanceof UUID)
{
@@ -146,10 +158,10 @@ public abstract class AbstractExchange<T
}
}
- _bindingListener = new StateChangeListener<Binding, State>()
+ _bindingListener = new StateChangeListener<BindingImpl, State>()
{
@Override
- public void stateChanged(final Binding binding, final State oldState, final State newState)
+ public void stateChanged(final BindingImpl binding, final State oldState, final State newState)
{
if(newState == State.DELETED)
{
@@ -184,8 +196,8 @@ public abstract class AbstractExchange<T
if(_closed.compareAndSet(false,true))
{
- List<Binding> bindings = new ArrayList<Binding>(_bindings);
- for(Binding binding : bindings)
+ List<BindingImpl> bindings = new ArrayList<BindingImpl>(_bindings);
+ for(BindingImpl binding : bindings)
{
binding.removeStateChangeListener(_bindingListener);
binding.delete();
@@ -198,7 +210,7 @@ public abstract class AbstractExchange<T
CurrentActor.get().message(_logSubject, ExchangeMessages.DELETED());
- for(Action<Exchange> task : _closeTaskList)
+ for(Action<ExchangeImpl> task : _closeTaskList)
{
task.performAction(this);
}
@@ -218,7 +230,7 @@ public abstract class AbstractExchange<T
public final boolean isBound(String bindingKey, Map<String,Object> arguments, AMQQueue queue)
{
- for(Binding b : _bindings)
+ for(BindingImpl b : _bindings)
{
if(bindingKey.equals(b.getBindingKey()) && queue == b.getAMQQueue())
{
@@ -232,7 +244,7 @@ public abstract class AbstractExchange<T
public final boolean isBound(String bindingKey, AMQQueue queue)
{
- for(Binding b : _bindings)
+ for(BindingImpl b : _bindings)
{
if(bindingKey.equals(b.getBindingKey()) && queue == b.getAMQQueue())
{
@@ -244,7 +256,7 @@ public abstract class AbstractExchange<T
public final boolean isBound(String bindingKey)
{
- for(Binding b : _bindings)
+ for(BindingImpl b : _bindings)
{
if(bindingKey.equals(b.getBindingKey()))
{
@@ -256,7 +268,7 @@ public abstract class AbstractExchange<T
public final boolean isBound(AMQQueue queue)
{
- for(Binding b : _bindings)
+ for(BindingImpl b : _bindings)
{
if(queue == b.getAMQQueue())
{
@@ -269,7 +281,7 @@ public abstract class AbstractExchange<T
@Override
public final boolean isBound(Map<String, Object> arguments, AMQQueue queue)
{
- for(Binding b : _bindings)
+ for(BindingImpl b : _bindings)
{
if(queue == b.getAMQQueue() &&
((b.getArguments() == null || b.getArguments().isEmpty())
@@ -285,7 +297,7 @@ public abstract class AbstractExchange<T
public final boolean isBound(Map<String, Object> arguments)
{
- for(Binding b : _bindings)
+ for(BindingImpl b : _bindings)
{
if(((b.getArguments() == null || b.getArguments().isEmpty())
? (arguments == null || arguments.isEmpty())
@@ -301,7 +313,7 @@ public abstract class AbstractExchange<T
@Override
public final boolean isBound(String bindingKey, Map<String, Object> arguments)
{
- for(Binding b : _bindings)
+ for(BindingImpl b : _bindings)
{
if(b.getBindingKey().equals(bindingKey) &&
((b.getArguments() == null || b.getArguments().isEmpty())
@@ -319,12 +331,12 @@ public abstract class AbstractExchange<T
return !_bindings.isEmpty();
}
- public Exchange getAlternateExchange()
+ public NonDefaultExchange getAlternateExchange()
{
return _alternateExchange;
}
- public void setAlternateExchange(Exchange exchange)
+ public void setAlternateExchange(NonDefaultExchange exchange)
{
if(_alternateExchange != null)
{
@@ -353,17 +365,17 @@ public abstract class AbstractExchange<T
return !_referrers.isEmpty();
}
- public void addCloseTask(final Action<Exchange> task)
+ public void addCloseTask(final Action<ExchangeImpl> task)
{
_closeTaskList.add(task);
}
- public void removeCloseTask(final Action<Exchange> task)
+ public void removeCloseTask(final Action<ExchangeImpl> task)
{
_closeTaskList.remove(task);
}
- public final void doAddBinding(final Binding binding)
+ public final void doAddBinding(final BindingImpl binding)
{
_bindings.add(binding);
int bindingCountSize = _bindings.size();
@@ -384,7 +396,7 @@ public abstract class AbstractExchange<T
return _bindingCountHigh.get();
}
- public final void doRemoveBinding(final Binding binding)
+ public final void doRemoveBinding(final BindingImpl binding)
{
onUnbind(binding);
for(BindingListener listener : _listeners)
@@ -394,14 +406,14 @@ public abstract class AbstractExchange<T
_bindings.remove(binding);
}
- public final Collection<Binding> getBindings()
+ public final Collection<BindingImpl> getBindings()
{
return Collections.unmodifiableList(_bindings);
}
- protected abstract void onBind(final Binding binding);
+ protected abstract void onBind(final BindingImpl binding);
- protected abstract void onUnbind(final Binding binding);
+ protected abstract void onUnbind(final BindingImpl binding);
public String getName()
@@ -414,11 +426,6 @@ public abstract class AbstractExchange<T
return Collections.emptyMap();
}
- public UUID getId()
- {
- return _id;
- }
-
public long getBindingCount()
{
return getBindings().size();
@@ -469,13 +476,13 @@ public abstract class AbstractExchange<T
public final <M extends ServerMessage<? extends StorableMessageMetaData>> int send(final M message,
final InstanceProperties instanceProperties,
final ServerTransaction txn,
- final Action<? super MessageInstance<?, ? extends Consumer>> postEnqueueAction)
+ final Action<? super MessageInstance> postEnqueueAction)
{
List<? extends BaseQueue> queues = route(message, instanceProperties);
if(queues == null || queues.isEmpty())
{
- Exchange altExchange = getAlternateExchange();
+ ExchangeImpl altExchange = getAlternateExchange();
if(altExchange != null)
{
return altExchange.send(message, instanceProperties, txn, postEnqueueAction);
@@ -520,7 +527,8 @@ public abstract class AbstractExchange<T
protected abstract List<? extends BaseQueue> doRoute(final ServerMessage message,
final InstanceProperties instanceProperties);
- public long getMsgReceives()
+ @Override
+ public long getMessagesIn()
{
return _receivedMessageCount.get();
}
@@ -530,12 +538,14 @@ public abstract class AbstractExchange<T
return _routedMessageCount.get();
}
- public long getMsgDrops()
+ @Override
+ public long getMessagesDropped()
{
return _droppedMessageCount.get();
}
- public long getByteReceives()
+ @Override
+ public long getBytesIn()
{
return _receivedMessageSize.get();
}
@@ -545,16 +555,12 @@ public abstract class AbstractExchange<T
return _routedMessageSize.get();
}
- public long getByteDrops()
+ @Override
+ public long getBytesDropped()
{
return _droppedMessageSize.get();
}
- public long getCreateTime()
- {
- return _createTime;
- }
-
public void addBindingListener(final BindingListener listener)
{
_listeners.add(listener);
@@ -572,11 +578,11 @@ public abstract class AbstractExchange<T
}
@Override
- public boolean replaceBinding(final UUID id, final String bindingKey,
+ public boolean replaceBinding(final String bindingKey,
final AMQQueue queue,
final Map<String, Object> arguments)
{
- return makeBinding(id, bindingKey, queue, arguments, false, true);
+ return makeBinding(getBinding(bindingKey,queue).getId(), bindingKey, queue, arguments, false, true);
}
@Override
@@ -586,7 +592,7 @@ public abstract class AbstractExchange<T
makeBinding(id, bindingKey,queue, argumentMap,true, false);
}
- private void removeBinding(final Binding binding)
+ private void removeBinding(final BindingImpl binding)
{
String bindingKey = binding.getBindingKey();
AMQQueue queue = binding.getAMQQueue();
@@ -601,7 +607,7 @@ public abstract class AbstractExchange<T
// Check access
_virtualHost.getSecurityManager().authoriseUnbind(this, bindingKey, queue);
- Binding b = _bindingsMap.remove(new BindingIdentifier(bindingKey,queue));
+ BindingImpl b = _bindingsMap.remove(new BindingIdentifier(bindingKey,queue));
if (b != null)
{
@@ -617,9 +623,7 @@ public abstract class AbstractExchange<T
}
-
- @Override
- public Binding getBinding(String bindingKey, AMQQueue queue)
+ public BindingImpl getBinding(String bindingKey, AMQQueue queue)
{
assert queue != null;
@@ -656,8 +660,8 @@ public abstract class AbstractExchange<T
bindingKey,
_virtualHost.getName());
}
- Binding b = new Binding(id, bindingKey, queue, this, arguments);
- Binding existingMapping = _bindingsMap.putIfAbsent(new BindingIdentifier(bindingKey,queue), b);
+ BindingImpl b = new BindingImpl(id, bindingKey, queue, this, arguments);
+ BindingImpl existingMapping = _bindingsMap.putIfAbsent(new BindingIdentifier(bindingKey,queue), b);
if (existingMapping == null || force)
{
b.addStateChangeListener(_bindingListener);
@@ -682,6 +686,79 @@ public abstract class AbstractExchange<T
}
}
+ @Override
+ protected boolean setState(final State currentState, final State desiredState)
+ {
+ if(desiredState == State.DELETED)
+ {
+ try
+ {
+ _virtualHost.removeExchange(this,true);
+ }
+ catch (ExchangeIsAlternateException e)
+ {
+ return false;
+ }
+ catch (RequiredExchangeException e)
+ {
+ return false;
+ }
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ public String setName(final String currentName, final String desiredName)
+ throws IllegalStateException, AccessControlException
+ {
+ return null;
+ }
+
+ @Override
+ public State getState()
+ {
+ return _closed.get() ? State.DELETED : State.ACTIVE;
+ }
+
+ @Override
+ public void setDurable(final boolean durable)
+ throws IllegalStateException, AccessControlException, IllegalArgumentException
+ {
+ if(durable == isDurable())
+ {
+ return;
+ }
+ throw new IllegalArgumentException();
+ }
+
+ @Override
+ public LifetimePolicy getLifetimePolicy()
+ {
+ return _lifetimePolicy;
+ }
+
+ @Override
+ public LifetimePolicy setLifetimePolicy(final LifetimePolicy expected, final LifetimePolicy desired)
+ throws IllegalStateException, AccessControlException, IllegalArgumentException
+ {
+ // TODO
+ return _lifetimePolicy;
+ }
+
+ @Override
+ public <C extends ConfiguredObject> Collection<C> getChildren(final Class<C> clazz)
+ {
+ if(org.apache.qpid.server.model.Binding.class.isAssignableFrom(clazz))
+ {
+
+ return (Collection<C>) getBindings();
+ }
+ else
+ {
+ return Collections.EMPTY_SET;
+ }
+ }
private static final class BindingIdentifier
{
@@ -739,4 +816,120 @@ public abstract class AbstractExchange<T
}
}
+ @Override
+ public Collection<Publisher> getPublishers()
+ {
+ return Collections.emptySet();
+ }
+
+ @Override
+ public boolean deleteBinding(final String bindingKey, final AMQQueue queue)
+ {
+ final BindingImpl binding = getBinding(bindingKey, queue);
+ if(binding == null)
+ {
+ return false;
+ }
+ else
+ {
+ binding.delete();
+ return true;
+ }
+ }
+
+ @Override
+ public boolean hasBinding(final String bindingKey, final AMQQueue queue)
+ {
+ return getBinding(bindingKey,queue) != null;
+ }
+
+ @Override
+ public void setAlternateExchange(final ExchangeImpl exchange)
+ {
+ // todo
+ _alternateExchange = (NonDefaultExchange) exchange;
+ }
+
+ @Override
+ public org.apache.qpid.server.model.Binding createBinding(final String bindingKey,
+ final Queue queue,
+ final Map<String, Object> bindingArguments,
+ final Map<String, Object> attributes)
+ {
+ addBinding(bindingKey, (AMQQueue) queue, bindingArguments);
+ final BindingImpl binding = getBinding(bindingKey, (AMQQueue) queue);
+ childAdded(binding);
+ return binding;
+ }
+
+ @Override
+ public void delete()
+ {
+ try
+ {
+ _virtualHost.removeExchange(this,true);
+ }
+ catch (ExchangeIsAlternateException e)
+ {
+ throw new UnsupportedOperationException(e.getMessage(),e);
+ }
+ catch (RequiredExchangeException e)
+ {
+ throw new UnsupportedOperationException("'"+e.getMessage()+"' is a reserved exchange and can't be deleted",e);
+ }
+ }
+
+
+ @Override
+ public <T extends ConfiguredObject> T getParent(final Class<T> clazz)
+ {
+ if(clazz == org.apache.qpid.server.model.VirtualHost.class)
+ {
+ return (T) _virtualHost.getModel();
+ }
+ return super.getParent(clazz);
+ }
+
+ @Override
+ public Collection<String> getAttributeNames()
+ {
+ return getAttributeNames(getClass());
+ }
+
+ @Override
+ public Object getAttribute(final String name)
+ {
+ if(ConfiguredObject.STATE.equals(name))
+ {
+ return getState();
+ }
+ else if(LIFETIME_POLICY.equals(name))
+ {
+ return getLifetimePolicy();
+ }
+ else if(DURABLE.equals(name))
+ {
+ return isDurable();
+ }
+ return super.getAttribute(name);
+ }
+
+
+ @Override
+ protected void changeAttributes(Map<String, Object> attributes)
+ {
+ throw new UnsupportedOperationException("Changing attributes on exchange is not supported.");
+ }
+
+ @Override
+ protected void authoriseSetAttribute(String name, Object expected, Object desired) throws AccessControlException
+ {
+ _virtualHost.getSecurityManager().authoriseUpdate(this);
+ }
+
+ @Override
+ protected void authoriseSetAttributes(Map<String, Object> attributes) throws AccessControlException
+ {
+ _virtualHost.getSecurityManager().authoriseUpdate(this);
+ }
}
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java?rev=1572343&r1=1572342&r2=1572343&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java Wed Feb 26 23:27:39 2014
@@ -19,17 +19,14 @@
package org.apache.qpid.server.exchange;
import java.security.AccessControlException;
-import java.util.ArrayList;
-import java.util.Collection;
import java.util.Collections;
-import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicBoolean;
+
import org.apache.log4j.Logger;
import org.apache.qpid.exchange.ExchangeDefaults;
-import org.apache.qpid.server.binding.Binding;
+import org.apache.qpid.server.binding.BindingImpl;
import org.apache.qpid.server.consumer.Consumer;
import org.apache.qpid.server.message.InstanceProperties;
import org.apache.qpid.server.message.MessageInstance;
@@ -46,14 +43,13 @@ import org.apache.qpid.server.util.Actio
import org.apache.qpid.server.util.StateChangeListener;
import org.apache.qpid.server.virtualhost.VirtualHost;
-public class DefaultExchange implements Exchange<DirectExchange>
+public class DefaultExchange implements ExchangeImpl<DirectExchange>
{
private final QueueRegistry _queueRegistry;
private UUID _id;
private VirtualHost _virtualHost;
private static final Logger _logger = Logger.getLogger(DefaultExchange.class);
- private final AtomicBoolean _closed = new AtomicBoolean();
private Map<ExchangeReferrer,Object> _referrers = new ConcurrentHashMap<ExchangeReferrer,Object>();
@@ -76,44 +72,27 @@ public class DefaultExchange implements
return DirectExchange.TYPE;
}
- @Override
- public long getBindingCount()
- {
- return _virtualHost.getQueues().size();
- }
-
- @Override
- public long getByteDrops()
- {
- return 0;
- }
-
- @Override
- public long getByteReceives()
- {
- return 0;
- }
@Override
- public long getMsgDrops()
+ public boolean addBinding(String bindingKey, AMQQueue queue, Map<String, Object> arguments)
{
- return 0;
+ throw new AccessControlException("Cannot add bindings to the default exchange");
}
@Override
- public long getMsgReceives()
+ public boolean deleteBinding(final String bindingKey, final AMQQueue queue)
{
- return 0;
+ throw new AccessControlException("Cannot delete bindings from the default exchange");
}
@Override
- public boolean addBinding(String bindingKey, AMQQueue queue, Map<String, Object> arguments)
+ public boolean hasBinding(final String bindingKey, final AMQQueue queue)
{
- throw new AccessControlException("Cannot add bindings to the default exchange");
+ return false;
}
@Override
- public boolean replaceBinding(UUID id, String bindingKey, AMQQueue queue, Map<String, Object> arguments)
+ public boolean replaceBinding(String bindingKey, AMQQueue queue, Map<String, Object> arguments)
{
throw new AccessControlException("Cannot replace bindings on the default exchange");
}
@@ -125,34 +104,6 @@ public class DefaultExchange implements
}
@Override
- public Binding getBinding(String bindingKey, AMQQueue queue)
- {
- if(_virtualHost.getQueue(bindingKey) == queue)
- {
- return convertToBinding(queue);
- }
- else
- {
- return null;
- }
-
- }
-
- private Binding convertToBinding(AMQQueue queue)
- {
- String queueName = queue.getName();
-
- UUID exchangeId = UUIDGenerator.generateBindingUUID(ExchangeDefaults.DEFAULT_EXCHANGE_NAME,
- queueName,
- queueName,
- _virtualHost.getName());
-
- final Binding binding = new Binding(exchangeId, queueName, queue, this, Collections.EMPTY_MAP);
- binding.addStateChangeListener(STATE_CHANGE_LISTENER);
- return binding;
- }
-
- @Override
public String getTypeName()
{
return getExchangeType().getType();
@@ -185,7 +136,7 @@ public class DefaultExchange implements
@Override
public boolean hasBindings()
{
- return getBindingCount() != 0;
+ return !_virtualHost.getQueues().isEmpty();
}
@Override
@@ -225,13 +176,13 @@ public class DefaultExchange implements
}
@Override
- public Exchange getAlternateExchange()
+ public ExchangeImpl getAlternateExchange()
{
return null;
}
@Override
- public void setAlternateExchange(Exchange exchange)
+ public void setAlternateExchange(ExchangeImpl exchange)
{
_logger.warn("Cannot set the alternate exchange for the default exchange");
}
@@ -255,38 +206,9 @@ public class DefaultExchange implements
}
@Override
- public Collection<Binding> getBindings()
- {
- List<Binding> bindings = new ArrayList<Binding>();
- for(AMQQueue q : _virtualHost.getQueues())
- {
- bindings.add(convertToBinding(q));
- }
- return bindings;
- }
-
- @Override
public void addBindingListener(BindingListener listener)
{
- _queueRegistry.addRegistryChangeListener(convertListener(listener));
- }
-
- private QueueRegistry.RegistryChangeListener convertListener(final BindingListener listener)
- {
- return new QueueRegistry.RegistryChangeListener()
- {
- @Override
- public void queueRegistered(AMQQueue queue)
- {
- listener.bindingAdded(DefaultExchange.this, convertToBinding(queue));
- }
- @Override
- public void queueUnregistered(AMQQueue queue)
- {
- listener.bindingRemoved(DefaultExchange.this, convertToBinding(queue));
- }
- };
}
@Override
@@ -304,7 +226,7 @@ public class DefaultExchange implements
public final <M extends ServerMessage<? extends StorableMessageMetaData>> int send(final M message,
final InstanceProperties instanceProperties,
final ServerTransaction txn,
- final Action<? super MessageInstance<?, ? extends Consumer>> postEnqueueAction)
+ final Action<? super MessageInstance> postEnqueueAction)
{
final AMQQueue q = _virtualHost.getQueue(message.getRoutingKey());
if(q == null)
@@ -338,11 +260,11 @@ public class DefaultExchange implements
}
}
- private static final StateChangeListener<Binding, State> STATE_CHANGE_LISTENER =
- new StateChangeListener<Binding, State>()
+ private static final StateChangeListener<BindingImpl, State> STATE_CHANGE_LISTENER =
+ new StateChangeListener<BindingImpl, State>()
{
@Override
- public void stateChanged(final Binding object, final State oldState, final State newState)
+ public void stateChanged(final BindingImpl object, final State oldState, final State newState)
{
if(newState == State.DELETED)
{
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java?rev=1572343&r1=1572342&r2=1572343&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java Wed Feb 26 23:27:39 2014
@@ -23,7 +23,6 @@ package org.apache.qpid.server.exchange;
import org.apache.log4j.Logger;
import org.apache.qpid.exchange.ExchangeDefaults;
-import org.apache.qpid.server.model.LifetimePolicy;
import org.apache.qpid.server.plugin.ExchangeType;
import org.apache.qpid.server.plugin.QpidServiceLoader;
import org.apache.qpid.server.util.MapValueConverter;
@@ -33,7 +32,6 @@ import org.apache.qpid.server.virtualhos
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
-import java.util.UUID;
public class DefaultExchangeFactory implements ExchangeFactory
{
@@ -48,7 +46,7 @@ public class DefaultExchangeFactory impl
ExchangeDefaults.TOPIC_EXCHANGE_CLASS};
private final VirtualHost _host;
- private Map<String, ExchangeType<? extends Exchange>> _exchangeClassMap = new HashMap<String, ExchangeType<? extends Exchange>>();
+ private Map<String, ExchangeType<? extends ExchangeImpl>> _exchangeClassMap = new HashMap<String, ExchangeType<? extends ExchangeImpl>>();
public DefaultExchangeFactory(VirtualHost host)
{
@@ -92,17 +90,17 @@ public class DefaultExchangeFactory impl
return new QpidServiceLoader<ExchangeType>().atLeastOneInstanceOf(ExchangeType.class);
}
- public Collection<ExchangeType<? extends Exchange>> getRegisteredTypes()
+ public Collection<ExchangeType<? extends ExchangeImpl>> getRegisteredTypes()
{
return _exchangeClassMap.values();
}
@Override
- public Exchange createExchange(final Map<String, Object> attributes)
+ public NonDefaultExchange createExchange(final Map<String, Object> attributes)
throws AMQUnknownExchangeType, UnknownExchangeException
{
String type = MapValueConverter.getStringAttribute(org.apache.qpid.server.model.Exchange.TYPE, attributes);
- ExchangeType<? extends Exchange> exchType = _exchangeClassMap.get(type);
+ ExchangeType<? extends ExchangeImpl> exchType = _exchangeClassMap.get(type);
if (exchType == null)
{
throw new AMQUnknownExchangeType("Unknown exchange type: " + type,null);
@@ -111,7 +109,7 @@ public class DefaultExchangeFactory impl
}
@Override
- public Exchange restoreExchange(Map<String,Object> attributes)
+ public NonDefaultExchange restoreExchange(Map<String,Object> attributes)
throws AMQUnknownExchangeType, UnknownExchangeException
{
return createExchange(attributes);
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java?rev=1572343&r1=1572342&r2=1572343&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java Wed Feb 26 23:27:39 2014
@@ -46,9 +46,9 @@ public class DefaultExchangeRegistry imp
/**
* Maps from exchange name to exchange instance
*/
- private ConcurrentMap<String, Exchange> _exchangeMap = new ConcurrentHashMap<String, Exchange>();
+ private ConcurrentMap<String, ExchangeImpl> _exchangeMap = new ConcurrentHashMap<String, ExchangeImpl>();
- private Exchange _defaultExchange;
+ private ExchangeImpl _defaultExchange;
private final VirtualHost _host;
private final QueueRegistry _queueRegistry;
@@ -77,7 +77,7 @@ public class DefaultExchangeRegistry imp
private void initialiseExchanges(ExchangeFactory factory, DurableConfigurationStore store)
{
- for (ExchangeType<? extends Exchange> type : factory.getRegisteredTypes())
+ for (ExchangeType<? extends ExchangeImpl> type : factory.getRegisteredTypes())
{
defineExchange(factory, type.getDefaultExchangeName(), type.getType(), store);
}
@@ -96,7 +96,7 @@ public class DefaultExchangeRegistry imp
attributes.put(org.apache.qpid.server.model.Exchange.NAME, name);
attributes.put(org.apache.qpid.server.model.Exchange.TYPE, type);
attributes.put(org.apache.qpid.server.model.Exchange.DURABLE, true);
- Exchange exchange = f.createExchange(attributes);
+ ExchangeImpl exchange = f.createExchange(attributes);
registerExchange(exchange);
if(exchange.isDurable())
{
@@ -122,7 +122,7 @@ public class DefaultExchangeRegistry imp
return _host.getDurableConfigurationStore();
}
- public void registerExchange(Exchange exchange)
+ public void registerExchange(ExchangeImpl exchange)
{
_exchangeMap.put(exchange.getName(), exchange);
synchronized (_listeners)
@@ -135,14 +135,14 @@ public class DefaultExchangeRegistry imp
}
}
- public Exchange getDefaultExchange()
+ public ExchangeImpl getDefaultExchange()
{
return _defaultExchange;
}
public boolean unregisterExchange(String name, boolean inUse)
{
- final Exchange exchange = _exchangeMap.get(name);
+ final ExchangeImpl exchange = _exchangeMap.get(name);
if (exchange != null)
{
@@ -150,7 +150,7 @@ public class DefaultExchangeRegistry imp
// TODO: check inUse argument
- Exchange e = _exchangeMap.remove(name);
+ ExchangeImpl e = _exchangeMap.remove(name);
// if it is null then it was removed by another thread at the same time, we can ignore
if (e != null)
{
@@ -170,9 +170,17 @@ public class DefaultExchangeRegistry imp
}
- public Collection<Exchange> getExchanges()
+ public Collection<ExchangeImpl> getExchanges()
{
- return new ArrayList<Exchange>(_exchangeMap.values());
+ return new ArrayList<ExchangeImpl>(_exchangeMap.values());
+ }
+
+ @Override
+ public Collection<NonDefaultExchange> getExchangesExceptDefault()
+ {
+ Collection allExchanges = getExchanges();
+ allExchanges.remove(_defaultExchange);
+ return allExchanges;
}
public void addRegistryChangeListener(RegistryChangeListener listener)
@@ -180,7 +188,7 @@ public class DefaultExchangeRegistry imp
_listeners.add(listener);
}
- public Exchange getExchange(String name)
+ public ExchangeImpl getExchange(String name)
{
if ((name == null) || name.length() == 0)
{
@@ -195,7 +203,7 @@ public class DefaultExchangeRegistry imp
@Override
public void clearAndUnregisterMbeans()
{
- for (final Exchange exchange : getExchanges())
+ for (final ExchangeImpl exchange : getExchanges())
{
//TODO: this is a bit of a hack, what if the listeners aren't aware
//that we are just unregistering the MBean because of HA, and aren't
@@ -212,7 +220,7 @@ public class DefaultExchangeRegistry imp
}
@Override
- public synchronized Exchange getExchange(UUID exchangeId)
+ public synchronized ExchangeImpl getExchange(UUID exchangeId)
{
if (exchangeId == null)
{
@@ -220,8 +228,8 @@ public class DefaultExchangeRegistry imp
}
else
{
- Collection<Exchange> exchanges = _exchangeMap.values();
- for (Exchange exchange : exchanges)
+ Collection<ExchangeImpl> exchanges = _exchangeMap.values();
+ for (ExchangeImpl exchange : exchanges)
{
if (exchange.getId().equals(exchangeId))
{
@@ -239,8 +247,8 @@ public class DefaultExchangeRegistry imp
{
return true;
}
- Collection<ExchangeType<? extends Exchange>> registeredTypes = _host.getExchangeTypes();
- for (ExchangeType<? extends Exchange> type : registeredTypes)
+ Collection<ExchangeType<? extends ExchangeImpl>> registeredTypes = _host.getExchangeTypes();
+ for (ExchangeType<? extends ExchangeImpl> type : registeredTypes)
{
if (type.getDefaultExchangeName().equals(name))
{
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java?rev=1572343&r1=1572342&r2=1572343&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java Wed Feb 26 23:27:39 2014
@@ -25,7 +25,7 @@ import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.apache.log4j.Logger;
-import org.apache.qpid.server.binding.Binding;
+import org.apache.qpid.server.binding.BindingImpl;
import org.apache.qpid.server.filter.AMQInvalidArgumentException;
import org.apache.qpid.server.filter.FilterSupport;
import org.apache.qpid.server.filter.Filterable;
@@ -51,17 +51,17 @@ public class DirectExchange extends Abst
private static final class BindingSet
{
- private CopyOnWriteArraySet<Binding> _bindings = new CopyOnWriteArraySet<Binding>();
+ private CopyOnWriteArraySet<BindingImpl> _bindings = new CopyOnWriteArraySet<BindingImpl>();
private List<BaseQueue> _unfilteredQueues = new ArrayList<BaseQueue>();
private Map<BaseQueue, MessageFilter> _filteredQueues = new HashMap<BaseQueue, MessageFilter>();
- public synchronized void addBinding(Binding binding)
+ public synchronized void addBinding(BindingImpl binding)
{
_bindings.add(binding);
recalculateQueues();
}
- public synchronized void removeBinding(Binding binding)
+ public synchronized void removeBinding(BindingImpl binding)
{
_bindings.remove(binding);
recalculateQueues();
@@ -72,7 +72,7 @@ public class DirectExchange extends Abst
List<BaseQueue> queues = new ArrayList<BaseQueue>(_bindings.size());
Map<BaseQueue, MessageFilter> filteredQueues = new HashMap<BaseQueue,MessageFilter>();
- for(Binding b : _bindings)
+ for(BindingImpl b : _bindings)
{
if(FilterSupport.argumentsContainFilter(b.getArguments()))
@@ -85,7 +85,7 @@ public class DirectExchange extends Abst
catch (AMQInvalidArgumentException e)
{
_logger.warn("Binding ignored: cannot parse filter on binding of queue '"+b.getAMQQueue().getName()
- + "' to exchange '" + b.getExchangeImpl().getName()
+ + "' to exchange '" + b.getExchange().getName()
+ "' with arguments: " + b.getArguments(), e);
}
@@ -109,7 +109,7 @@ public class DirectExchange extends Abst
return _unfilteredQueues;
}
- public CopyOnWriteArraySet<Binding> getBindings()
+ public CopyOnWriteArraySet<BindingImpl> getBindings()
{
return _bindings;
}
@@ -185,7 +185,7 @@ public class DirectExchange extends Abst
}
- protected void onBind(final Binding binding)
+ protected void onBind(final BindingImpl binding)
{
String bindingKey = binding.getBindingKey();
AMQQueue queue = binding.getAMQQueue();
@@ -209,7 +209,7 @@ public class DirectExchange extends Abst
}
- protected void onUnbind(final Binding binding)
+ protected void onUnbind(final BindingImpl binding)
{
assert binding != null;
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/ExchangeFactory.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/ExchangeFactory.java?rev=1572343&r1=1572342&r2=1572343&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/ExchangeFactory.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/ExchangeFactory.java Wed Feb 26 23:27:39 2014
@@ -25,16 +25,15 @@ import org.apache.qpid.server.virtualhos
import java.util.Collection;
import java.util.Map;
-import java.util.UUID;
public interface ExchangeFactory
{
- Collection<ExchangeType<? extends Exchange>> getRegisteredTypes();
+ Collection<ExchangeType<? extends ExchangeImpl>> getRegisteredTypes();
- Exchange createExchange(Map<String,Object> attributes) throws AMQUnknownExchangeType, UnknownExchangeException;
+ NonDefaultExchange createExchange(Map<String,Object> attributes) throws AMQUnknownExchangeType, UnknownExchangeException;
- Exchange restoreExchange(Map<String,Object> attributes) throws AMQUnknownExchangeType, UnknownExchangeException;
+ NonDefaultExchange restoreExchange(Map<String,Object> attributes) throws AMQUnknownExchangeType, UnknownExchangeException;
}
Copied: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/ExchangeImpl.java (from r1571506, qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/Exchange.java)
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/ExchangeImpl.java?p2=qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/ExchangeImpl.java&p1=qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/Exchange.java&r1=1571506&r2=1572343&rev=1572343&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/Exchange.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/ExchangeImpl.java Wed Feb 26 23:27:39 2014
@@ -20,17 +20,15 @@
*/
package org.apache.qpid.server.exchange;
-import org.apache.qpid.server.binding.Binding;
+import org.apache.qpid.server.binding.BindingImpl;
import org.apache.qpid.server.message.MessageDestination;
import org.apache.qpid.server.plugin.ExchangeType;
import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.virtualhost.VirtualHost;
-import java.util.Collection;
import java.util.Map;
import java.util.UUID;
-public interface Exchange<T extends Exchange> extends ExchangeReferrer, MessageDestination
+public interface ExchangeImpl<T extends NonDefaultExchange> extends ExchangeReferrer, MessageDestination
{
UUID getId();
@@ -48,32 +46,22 @@ public interface Exchange<T extends Exch
*/
boolean isAutoDelete();
- Exchange getAlternateExchange();
-
- void setAlternateExchange(Exchange exchange);
-
- long getBindingCount();
-
- long getByteDrops();
-
- long getByteReceives();
-
- long getMsgDrops();
-
- long getMsgReceives();
+ ExchangeImpl getAlternateExchange();
+ void setAlternateExchange(ExchangeImpl exchange);
boolean addBinding(String bindingKey, AMQQueue queue, Map<String, Object> arguments);
+ boolean deleteBinding(String bindingKey, AMQQueue queue);
+ boolean hasBinding(String bindingKey, AMQQueue queue);
+
- boolean replaceBinding(UUID id, String bindingKey,
+ boolean replaceBinding(String bindingKey,
AMQQueue queue,
Map<String, Object> arguments);
void restoreBinding(UUID id, String bindingKey, AMQQueue queue,
Map<String, Object> argumentMap);
- Binding getBinding(String bindingKey, AMQQueue queue);
-
void close();
/**
@@ -108,8 +96,6 @@ public interface Exchange<T extends Exch
*/
boolean hasBindings();
- Collection<Binding> getBindings();
-
boolean isBound(AMQQueue queue);
boolean isBound(Map<String, Object> arguments);
@@ -128,8 +114,8 @@ public interface Exchange<T extends Exch
public interface BindingListener
{
- void bindingAdded(Exchange exchange, Binding binding);
- void bindingRemoved(Exchange exchange, Binding binding);
+ void bindingAdded(ExchangeImpl exchange, BindingImpl binding);
+ void bindingRemoved(ExchangeImpl exchange, BindingImpl binding);
}
public void addBindingListener(BindingListener listener);
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java?rev=1572343&r1=1572342&r2=1572343&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java Wed Feb 26 23:27:39 2014
@@ -26,13 +26,13 @@ import java.util.UUID;
public interface ExchangeRegistry
{
- void registerExchange(Exchange exchange);
+ void registerExchange(ExchangeImpl exchange);
- Exchange getDefaultExchange();
+ ExchangeImpl getDefaultExchange();
void initialise(ExchangeFactory exchangeFactory);
- Exchange getExchange(String exchangeName);
+ ExchangeImpl getExchange(String exchangeName);
/**
* Unregister an exchange
@@ -43,9 +43,11 @@ public interface ExchangeRegistry
void clearAndUnregisterMbeans();
- Exchange getExchange(UUID exchangeId);
+ ExchangeImpl getExchange(UUID exchangeId);
- Collection<Exchange> getExchanges();
+ Collection<ExchangeImpl> getExchanges();
+
+ Collection<NonDefaultExchange> getExchangesExceptDefault();
void addRegistryChangeListener(RegistryChangeListener listener);
@@ -58,7 +60,7 @@ public interface ExchangeRegistry
interface RegistryChangeListener
{
- void exchangeRegistered(Exchange exchange);
- void exchangeUnregistered(Exchange exchange);
+ void exchangeRegistered(ExchangeImpl exchange);
+ void exchangeUnregistered(ExchangeImpl exchange);
}
}
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java?rev=1572343&r1=1572342&r2=1572343&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java Wed Feb 26 23:27:39 2014
@@ -27,7 +27,7 @@ import java.util.concurrent.CopyOnWriteA
import java.util.concurrent.atomic.AtomicReference;
import org.apache.log4j.Logger;
-import org.apache.qpid.server.binding.Binding;
+import org.apache.qpid.server.binding.BindingImpl;
import org.apache.qpid.server.filter.AMQInvalidArgumentException;
import org.apache.qpid.server.filter.FilterSupport;
import org.apache.qpid.server.filter.Filterable;
@@ -55,10 +55,10 @@ public class FanoutExchange extends Abst
private final CopyOnWriteArrayList<AMQQueue> _unfilteredQueues = new CopyOnWriteArrayList<AMQQueue>();
private final CopyOnWriteArrayList<AMQQueue> _filteredQueues = new CopyOnWriteArrayList<AMQQueue>();
- private final AtomicReference<Map<AMQQueue,Map<Binding, MessageFilter>>> _filteredBindings =
- new AtomicReference<Map<AMQQueue,Map<Binding, MessageFilter>>>();
+ private final AtomicReference<Map<AMQQueue,Map<BindingImpl, MessageFilter>>> _filteredBindings =
+ new AtomicReference<Map<AMQQueue,Map<BindingImpl, MessageFilter>>>();
{
- Map<AMQQueue,Map<Binding, MessageFilter>> emptyMap = Collections.emptyMap();
+ Map<AMQQueue,Map<BindingImpl, MessageFilter>> emptyMap = Collections.emptyMap();
_filteredBindings.set(emptyMap);
}
@@ -82,7 +82,7 @@ public class FanoutExchange extends Abst
public ArrayList<BaseQueue> doRoute(ServerMessage payload, final InstanceProperties instanceProperties)
{
- for(Binding b : getBindings())
+ for(BindingImpl b : getBindings())
{
b.incrementMatches();
}
@@ -90,12 +90,12 @@ public class FanoutExchange extends Abst
final ArrayList<BaseQueue> result = new ArrayList<BaseQueue>(_unfilteredQueues);
- final Map<AMQQueue, Map<Binding, MessageFilter>> filteredBindings = _filteredBindings.get();
+ final Map<AMQQueue, Map<BindingImpl, MessageFilter>> filteredBindings = _filteredBindings.get();
if(!_filteredQueues.isEmpty())
{
for(AMQQueue q : _filteredQueues)
{
- final Map<Binding, MessageFilter> bindingMessageFilterMap = filteredBindings.get(q);
+ final Map<BindingImpl, MessageFilter> bindingMessageFilterMap = filteredBindings.get(q);
if(!(bindingMessageFilterMap == null || result.contains(q)))
{
for(MessageFilter filter : bindingMessageFilterMap.values())
@@ -122,7 +122,7 @@ public class FanoutExchange extends Abst
}
- protected synchronized void onBind(final Binding binding)
+ protected synchronized void onBind(final BindingImpl binding)
{
AMQQueue queue = binding.getAMQQueue();
assert queue != null;
@@ -148,17 +148,17 @@ public class FanoutExchange extends Abst
try
{
- HashMap<AMQQueue,Map<Binding, MessageFilter>> filteredBindings =
- new HashMap<AMQQueue,Map<Binding, MessageFilter>>(_filteredBindings.get());
+ HashMap<AMQQueue,Map<BindingImpl, MessageFilter>> filteredBindings =
+ new HashMap<AMQQueue,Map<BindingImpl, MessageFilter>>(_filteredBindings.get());
- Map<Binding, MessageFilter> bindingsForQueue = filteredBindings.remove(binding.getAMQQueue());
+ Map<BindingImpl, MessageFilter> bindingsForQueue = filteredBindings.remove(binding.getAMQQueue());
final
MessageFilter messageFilter =
FilterSupport.createMessageFilter(binding.getArguments(), binding.getAMQQueue());
if(bindingsForQueue != null)
{
- bindingsForQueue = new HashMap<Binding,MessageFilter>(bindingsForQueue);
+ bindingsForQueue = new HashMap<BindingImpl,MessageFilter>(bindingsForQueue);
bindingsForQueue.put(binding, messageFilter);
}
else
@@ -188,7 +188,7 @@ public class FanoutExchange extends Abst
}
}
- protected synchronized void onUnbind(final Binding binding)
+ protected synchronized void onUnbind(final BindingImpl binding)
{
AMQQueue queue = binding.getAMQQueue();
if(binding.getArguments() == null || binding.getArguments().isEmpty() || !FilterSupport.argumentsContainFilter(binding.getArguments()))
@@ -210,13 +210,13 @@ public class FanoutExchange extends Abst
}
else // we are removing a binding with filters
{
- HashMap<AMQQueue,Map<Binding, MessageFilter>> filteredBindings =
- new HashMap<AMQQueue,Map<Binding, MessageFilter>>(_filteredBindings.get());
+ HashMap<AMQQueue,Map<BindingImpl, MessageFilter>> filteredBindings =
+ new HashMap<AMQQueue,Map<BindingImpl, MessageFilter>>(_filteredBindings.get());
- Map<Binding,MessageFilter> bindingsForQueue = filteredBindings.remove(binding.getAMQQueue());
+ Map<BindingImpl,MessageFilter> bindingsForQueue = filteredBindings.remove(binding.getAMQQueue());
if(bindingsForQueue.size()>1)
{
- bindingsForQueue = new HashMap<Binding,MessageFilter>(bindingsForQueue);
+ bindingsForQueue = new HashMap<BindingImpl,MessageFilter>(bindingsForQueue);
bindingsForQueue.remove(binding);
filteredBindings.put(binding.getAMQQueue(),bindingsForQueue);
}
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java?rev=1572343&r1=1572342&r2=1572343&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java Wed Feb 26 23:27:39 2014
@@ -22,7 +22,7 @@ package org.apache.qpid.server.exchange;
import org.apache.log4j.Logger;
-import org.apache.qpid.server.binding.Binding;
+import org.apache.qpid.server.binding.BindingImpl;
import org.apache.qpid.server.filter.AMQInvalidArgumentException;
import org.apache.qpid.server.filter.FilterSupport;
import org.apache.qpid.server.filter.MessageFilter;
@@ -42,7 +42,7 @@ class HeadersBinding
private static final Logger _logger = Logger.getLogger(HeadersBinding.class);
private final Map<String,Object> _mappings;
- private final Binding _binding;
+ private final BindingImpl _binding;
private final Set<String> required = new HashSet<String>();
private final Map<String,Object> matches = new HashMap<String,Object>();
private boolean matchAny;
@@ -56,7 +56,7 @@ class HeadersBinding
*
* @param binding the binding to create a header binding using
*/
- public HeadersBinding(Binding binding)
+ public HeadersBinding(BindingImpl binding)
{
_binding = binding;
if(_binding !=null)
@@ -81,7 +81,7 @@ class HeadersBinding
catch (AMQInvalidArgumentException e)
{
_logger.warn("Invalid filter in binding queue '"+_binding.getAMQQueue().getName()
- +"' to exchange '"+_binding.getExchangeImpl().getName()
+ +"' to exchange '"+_binding.getExchange().getName()
+"' with arguments: " + _binding.getArguments());
_filter = new MessageFilter()
{
@@ -112,7 +112,7 @@ class HeadersBinding
}
}
- public Binding getBinding()
+ public BindingImpl getBinding()
{
return _binding;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org