You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2009/10/20 18:23:11 UTC
svn commit: r827724 [1/8] - in /qpid/branches/java-broker-0-10/qpid/java: ./
broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/
broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/example/
broker/src/main/java/org/apach...
Author: rgodfrey
Date: Tue Oct 20 16:23:01 2009
New Revision: 827724
URL: http://svn.apache.org/viewvc?rev=827724&view=rev
Log:
Implemented persistence, changed transactions, implemented message conversion, fixed for all Java tests
Added:
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeInitialiser.java (contents, props changed)
- copied, changed from r821930, qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ExchangeInitialiser.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeReferrer.java (contents, props changed)
- copied, changed from r824494, qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/ExchangeReferrer.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/message/AMQMessage.java (contents, props changed)
- copied, changed from r824494, qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageContentSource.java (with props)
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData.java (contents, props changed)
- copied, changed from r821930, qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/MessageMetaData.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData_0_10.java (with props)
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/output/HeaderPropertiesConverter.java (with props)
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngineFactory_0_10.java (contents, props changed)
- copied, changed from r824494, qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/ProtocolEngineFactory_0_10.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java (contents, props changed)
- copied, changed from r824494, qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/ProtocolEngine_0_10.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueContext.java (with props)
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java (with props)
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/security/PrincipalHolder.java (contents, props changed)
- copied, changed from r821930, qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/PrincipalHolder.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/store/ConfigurationRecoveryHandler.java (with props)
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageMetaDataType.java (with props)
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStoreRecoveryHandler.java (with props)
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StorableMessageMetaData.java (with props)
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java (with props)
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoredMessage.java (with props)
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/store/TransactionLogRecoveryHandler.java (with props)
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/store/TransactionLogResource.java (with props)
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/ExplicitAcceptDispositionChangeListener.java (with props)
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/ImplicitAcceptDispositionChangeListener.java (with props)
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/ServerTransaction.java (contents, props changed)
- copied, changed from r824084, qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/Transaction.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java (with props)
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java (with props)
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java
- copied, changed from r824494, qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockStoredMessage.java (with props)
qpid/branches/java-broker-0-10/qpid/java/test-profiles/java.0.10.testprofile (with props)
Removed:
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/ConsumerTagNotUniqueException.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/ExchangeReferrer.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/PrincipalHolder.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/ProtocolEngineFactory_0_10.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/ProtocolEngine_0_10.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/RequiredDeliveryException.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/NoRouteException.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ExchangeInitialiser.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessageHandle.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/BodyContentHolder.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/BodyContentIterator.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/BodyFrameIterator.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/InMemoryMessageHandle.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/MessageHandleFactory.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/MessageMetaData.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/NoConsumersException.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/TransientMessageData.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/UnauthorizedAccessException.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/Transaction.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/util/ConcurrentLinkedQueueNoSize.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQMessageHandle.java
Modified:
qpid/branches/java-broker-0-10/qpid/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/DiagnosticExchange.java
qpid/branches/java-broker-0-10/qpid/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/DiagnosticExchangeType.java
qpid/branches/java-broker-0-10/qpid/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/example/TestExchange.java
qpid/branches/java-broker-0-10/qpid/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/example/TestExchangeType.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/ExtractResendAndRequeue.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/Index.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/MessageRouter.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/BooleanExpression.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/ComparisonExpression.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/ConstantExpression.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/Expression.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/FilterManager.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/LogicExpression.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/MessageFilter.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/XPathExpression.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/XQueryExpression.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/XalanXPathEvaluator.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/flow/FlowCreditManager.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionTuneOkMethodHandler.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/LogMessages.properties
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/messages/LogMessages_en_US.properties
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/management/DefaultManagedObject.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/management/JMXManagedObjectRegistry.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/management/ManagedObject.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/message/AMQMessageHeader.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/message/AMQMessageReference.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/message/ContentHeaderBodyAdapter.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageTransferHeader.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageTransferMessage.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/message/ServerMessage.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/output/ProtocolOutputConverter.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ProtocolOutputConverterImpl.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9/ProtocolOutputConverterImpl.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DefaultQueueRegistry.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/ACLManager.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/ACLPlugin.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/AbstractACLPlugin.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/BasicACLPlugin.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/SimpleXML.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/network/FirewallPlugin.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/ConfigurationFilePrincipalDatabaseManager.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/store/AbstractMessageStore.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DerbyMessageStore.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/store/TransactionLog.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/MessageAcceptCompletionListener.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/AutoCommitTransaction.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostRegistry.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Copy.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Dump.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/List.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Move.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/AMQBrokerManagerMBeanTest.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/ExtractResendAndRequeueTest.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/configuration/VirtualHostConfigurationTest.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/ExchangeMBeanTest.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/AMQPChannelActorTest.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/actors/AMQPConnectionActorTest.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/logging/messages/MessageStoreMessagesTest.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBeanTest.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/MaxChannelsTest.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AckTest.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQMessage.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueThreadPoolTest.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/ACLManagerTest.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/ExchangeDenier.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/PrincipalPermissionsTest.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/QueueDenier.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/store/SkeletonMessageStore.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestMemoryMessageStore.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestReferenceCounting.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/util/NullApplicationRegistry.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/server/util/TestApplicationRegistry.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/test/java/org/apache/qpid/util/MockChannel.java
qpid/branches/java-broker-0-10/qpid/java/broker/src/velocity/java/org/apache/qpid/server/logging/GenerateLogMessages.java
qpid/branches/java-broker-0-10/qpid/java/build.deps
qpid/branches/java-broker-0-10/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
qpid/branches/java-broker-0-10/qpid/java/common/Composite.tpl
qpid/branches/java-broker-0-10/qpid/java/common/genutil.py
qpid/branches/java-broker-0-10/qpid/java/common/src/main/java/org/apache/qpid/framing/abstraction/ProtocolVersionMethodConverter.java
qpid/branches/java-broker-0-10/qpid/java/common/src/main/java/org/apache/qpid/framing/amqp_0_9/MethodConverter_0_9.java
qpid/branches/java-broker-0-10/qpid/java/common/src/main/java/org/apache/qpid/framing/amqp_8_0/MethodConverter_8_0.java
qpid/branches/java-broker-0-10/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java
qpid/branches/java-broker-0-10/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MINANetworkDriver.java
qpid/branches/java-broker-0-10/qpid/java/systests/src/main/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java
qpid/branches/java-broker-0-10/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java
qpid/branches/java-broker-0-10/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java
qpid/branches/java-broker-0-10/qpid/java/systests/src/main/java/org/apache/qpid/test/client/CancelTest.java
qpid/branches/java-broker-0-10/qpid/java/systests/src/main/java/org/apache/qpid/test/client/DupsOkTest.java
qpid/branches/java-broker-0-10/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/basic/LargeMessageTest.java
qpid/branches/java-broker-0-10/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java
qpid/branches/java-broker-0-10/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/ct/DurableSubscriberTest.java
Modified: qpid/branches/java-broker-0-10/qpid/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/DiagnosticExchange.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/DiagnosticExchange.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/DiagnosticExchange.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/DiagnosticExchange.java Tue Oct 20 16:23:01 2009
@@ -38,21 +38,25 @@
import org.apache.qpid.junit.extensions.util.SizeOf;
import org.apache.qpid.management.common.mbeans.annotations.MBeanConstructor;
import org.apache.qpid.management.common.mbeans.annotations.MBeanDescription;
+import org.apache.log4j.Logger;
/**
- *
+ *
* This is a special diagnostic exchange type which doesn't actually do anything
* with messages. When it receives a message, it writes information about the
* current memory usage to the "memory" property of the message and places it on the
- * diagnosticqueue for retrieval
- *
+ * diagnosticqueue for retrieval
+ *
* @author Aidan Skinner
- *
+ *
*/
public class DiagnosticExchange extends AbstractExchange
{
-
+
+ private static final Logger _logger = Logger.getLogger(DiagnosticExchange.class);
+
+
public static final AMQShortString DIAGNOSTIC_EXCHANGE_CLASS = new AMQShortString("x-diagnostic");
public static final AMQShortString DIAGNOSTIC_EXCHANGE_NAME = new AMQShortString("diagnostic");
@@ -70,7 +74,7 @@
/**
* Usual constructor.
- *
+ *
* @throws JMException
*/
@MBeanConstructor("Creates an MBean for AMQ Diagnostic exchange")
@@ -83,7 +87,7 @@
/**
* Returns nothing, there can be no tabular data for this...
- *
+ *
* @throws OpenDataException
* @returns null
* @todo ... or can there? Could this actually return all the
@@ -97,7 +101,7 @@
/**
* This exchange type doesn't support queues, so this method does
* nothing.
- *
+ *
* @param queueName
* the queue you'll fail to create
* @param binding
@@ -114,22 +118,20 @@
/**
* Creates a new MBean instance
- *
+ *
* @return the newly created MBean
* @throws AMQException
* if something goes wrong
*/
- protected ExchangeMBean createMBean() throws AMQException
+ protected ExchangeMBean createMBean() throws JMException
{
- try
- {
- return new DiagnosticExchange.DiagnosticExchangeMBean();
- }
- catch (JMException ex)
- {
- // _logger.error("Exception occured in creating the direct exchange mbean", ex);
- throw new AMQException(null, "Exception occured in creating the direct exchange mbean", ex);
- }
+ return new DiagnosticExchange.DiagnosticExchangeMBean();
+
+ }
+
+ public Logger getLogger()
+ {
+ return _logger;
}
public AMQShortString getType()
@@ -139,7 +141,7 @@
/**
* Does nothing.
- *
+ *
* @param routingKey
* pointless
* @param queue
@@ -162,7 +164,7 @@
/**
* Does nothing.
- *
+ *
* @param routingKey
* pointless
* @param queue
@@ -199,7 +201,7 @@
public ArrayList<AMQQueue> route(InboundMessage payload)
{
-
+
Long value = new Long(SizeOf.getUsedMemory());
AMQShortString key = new AMQShortString("memory");
@@ -212,10 +214,10 @@
ArrayList<AMQQueue> queues = new ArrayList<AMQQueue>();
queues.add(q);
return queues;
-
+
}
-
+
public boolean isBound(AMQShortString routingKey, FieldTable arguments,
AMQQueue queue) {
// TODO Auto-generated method stub
Modified: qpid/branches/java-broker-0-10/qpid/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/DiagnosticExchangeType.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/DiagnosticExchangeType.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/DiagnosticExchangeType.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/DiagnosticExchangeType.java Tue Oct 20 16:23:01 2009
@@ -31,7 +31,7 @@
*/
public final class DiagnosticExchangeType implements ExchangeType<DiagnosticExchange>
{
-
+
public AMQShortString getName()
{
return DiagnosticExchange.DIAGNOSTIC_EXCHANGE_CLASS;
Modified: qpid/branches/java-broker-0-10/qpid/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/example/TestExchange.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/example/TestExchange.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/example/TestExchange.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/example/TestExchange.java Tue Oct 20 16:23:01 2009
@@ -1,6 +1,6 @@
package org.apache.qpid.extras.exchanges.example;
/*
- *
+ *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -8,16 +8,16 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
- *
+ *
*/
@@ -29,6 +29,7 @@
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.exchange.ExchangeReferrer;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.message.InboundMessage;
@@ -76,12 +77,27 @@
public Exchange getAlternateExchange()
{
- return null; //To change body of implemented methods use File | Settings | File Templates.
+ return null;
}
public void setAlternateExchange(Exchange exchange)
{
- //To change body of implemented methods use File | Settings | File Templates.
+
+ }
+
+ public void removeReference(ExchangeReferrer exchange)
+ {
+
+ }
+
+ public void addReference(ExchangeReferrer exchange)
+ {
+
+ }
+
+ public boolean hasReferrers()
+ {
+ return false;
}
public void initialise(VirtualHost host, AMQShortString name, boolean durable, boolean autoDelete)
Modified: qpid/branches/java-broker-0-10/qpid/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/example/TestExchangeType.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/example/TestExchangeType.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/example/TestExchangeType.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/example/TestExchangeType.java Tue Oct 20 16:23:01 2009
@@ -40,7 +40,7 @@
return null;
}
- public Exchange newInstance(VirtualHost host, AMQShortString name, boolean durable,
+ public Exchange newInstance(VirtualHost host, AMQShortString name, boolean durable,
int token, boolean autoDelete)
throws AMQException
{
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java Tue Oct 20 16:23:01 2009
@@ -65,6 +65,7 @@
import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.store.DurableConfigurationStore;
import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.virtualhost.VirtualHostImpl;
import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.logging.actors.ManagementActor;
@@ -80,10 +81,10 @@
private final ExchangeFactory _exchangeFactory;
private final DurableConfigurationStore _durableConfig;
- private final VirtualHost.VirtualHostMBean _virtualHostMBean;
+ private final VirtualHostImpl.VirtualHostMBean _virtualHostMBean;
@MBeanConstructor("Creates the Broker Manager MBean")
- public AMQBrokerManagerMBean(VirtualHost.VirtualHostMBean virtualHostMBean) throws JMException
+ public AMQBrokerManagerMBean(VirtualHostImpl.VirtualHostMBean virtualHostMBean) throws JMException
{
super(ManagedBroker.class, ManagedBroker.TYPE, ManagedBroker.VERSION);
@@ -113,10 +114,10 @@
{
exchangeTypes.add(ex.getName().toString());
}
-
+
return exchangeTypes.toArray(new String[0]);
}
-
+
/**
* Returns a list containing the names of the attributes available for the Queue mbeans.
* @since Qpid JMX API 1.3
@@ -129,12 +130,12 @@
{
attributeList.add(attr);
}
-
+
Collections.sort(attributeList);
return attributeList;
}
-
+
/**
* Returns a List of Object Lists containing the requested attribute values (in the same sequence requested) for each queue in the virtualhost.
* If a particular attribute cant be found or raises an mbean/reflection exception whilst being gathered its value is substituted with the String "-".
@@ -147,22 +148,22 @@
{
return new ArrayList<List<Object>>();
}
-
+
List<List<Object>> queueAttributesList = new ArrayList<List<Object>>(_queueRegistry.getQueues().size());
-
+
int attributesLength = attributes.length;
-
+
for(AMQQueue queue : _queueRegistry.getQueues())
{
AMQQueueMBean mbean = (AMQQueueMBean) queue.getManagedObject();
-
+
if(mbean == null)
{
continue;
}
-
+
List<Object> attributeValues = new ArrayList<Object>(attributesLength);
-
+
for(int i=0; i < attributesLength; i++)
{
try
@@ -174,13 +175,13 @@
attributeValues.add(new String("-"));
}
}
-
+
queueAttributesList.add(attributeValues);
}
-
+
return queueAttributesList;
}
-
+
/**
* Creates new exchange and registers it with the registry.
*
@@ -330,7 +331,7 @@
}
finally
{
- CurrentActor.remove();
+ CurrentActor.remove();
}
}
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java Tue Oct 20 16:23:01 2009
@@ -43,8 +43,11 @@
import org.apache.qpid.server.subscription.ClientDeliveryMethod;
import org.apache.qpid.server.subscription.RecordDeliveryMethod;
import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.StoredMessage;
import org.apache.qpid.server.txn.*;
import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.message.AMQMessage;
+import org.apache.qpid.server.message.MessageMetaData;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.logging.LogActor;
import org.apache.qpid.server.logging.LogSubject;
@@ -99,12 +102,7 @@
private final AtomicBoolean _suspended = new AtomicBoolean(false);
- private Transaction _transaction;
-
-
- private final List<RequiredDeliveryException> _returnMessages = new LinkedList<RequiredDeliveryException>();
-
- private MessageHandleFactory _messageHandleFactory = new MessageHandleFactory();
+ private ServerTransaction _transaction;
// Why do we need this reference ? - ritchiem
private final AMQProtocolSession _session;
@@ -161,7 +159,7 @@
public void setPublishFrame(MessagePublishInfo info, final Exchange e) throws AMQException
{
- _currentMessage = new IncomingMessage(_messageStore.getNewMessageId(), info, _session);
+ _currentMessage = new IncomingMessage(info);
_currentMessage.setExchange(e);
}
@@ -183,17 +181,15 @@
_currentMessage.setExpiration();
- routeCurrentMessage();
- MessageMetaData mmd = _currentMessage.routingComplete(_messageStore, _messageHandleFactory);
+ MessageMetaData mmd = _currentMessage.headersReceived();
+ final StoredMessage<MessageMetaData> handle = _messageStore.addMessage(mmd);
+ _currentMessage.setStoredMessage(handle);
- if(_currentMessage.isPersistent())
- {
- final Long messageNumber = _currentMessage.getMessageNumber();
+ routeCurrentMessage();
- _messageStore.storeMessageMetaData(messageNumber, mmd);
- _transaction.addPostCommitAction(new Transaction.Action()
+ _transaction.addPostCommitAction(new ServerTransaction.Action()
{
public void postCommit()
@@ -202,17 +198,9 @@
public void onRollback()
{
- try
- {
- _messageStore.removeMessage(messageNumber);
- }
- catch (AMQException e)
- {
-
- }
+ handle.remove();
}
});
- }
deliverCurrentMessageIfComplete();
@@ -283,16 +271,7 @@
final ContentChunk contentChunk =
_session.getMethodRegistry().getProtocolVersionMethodConverter().convertToContentChunk(contentBody);
- int chunkId = _currentMessage.addContentBodyFrame(contentChunk);
-
- if(_currentMessage.isPersistent())
- {
- final Long messageNumber = _currentMessage.getMessageNumber();
- _messageStore.storeContentBodyChunk(messageNumber, chunkId,
- contentChunk, _currentMessage.allContentReceived());
-
-
- }
+ _currentMessage.addContentBodyFrame(contentChunk);
deliverCurrentMessageIfComplete();
}
@@ -320,6 +299,12 @@
return ++_consumerTag;
}
+
+ public Subscription getSubscription(AMQShortString subscription)
+ {
+ return _tag2SubscriptionMap.get(subscription);
+ }
+
/**
* Subscribe to a queue. We register all subscriptions in the channel so that if the channel is closed we can clean
* up all subscriptions, even if the client does not explicitly unsubscribe from all queues.
@@ -333,11 +318,10 @@
* @param exclusive Flag requesting exclusive access to the queue
* @return the consumer tag. This is returned to the subscriber and used in subsequent unsubscribe requests
*
- * @throws ConsumerTagNotUniqueException if the tag is not unique
* @throws AMQException if something goes wrong
*/
public AMQShortString subscribeToQueue(AMQShortString tag, AMQQueue queue, boolean acks,
- FieldTable filters, boolean noLocal, boolean exclusive) throws AMQException, ConsumerTagNotUniqueException
+ FieldTable filters, boolean noLocal, boolean exclusive) throws AMQException
{
if (tag == null)
{
@@ -346,7 +330,7 @@
if (_tag2SubscriptionMap.containsKey(tag))
{
- throw new ConsumerTagNotUniqueException();
+ throw new AMQException("Consumer already exists with same tag: " + tag);
}
Subscription subscription =
@@ -532,7 +516,7 @@
if (!unacked.isQueueDeleted())
{
// Mark message redelivered
- unacked.setRedelivered(true);
+ unacked.setRedelivered();
// Ensure message is released for redelivery
unacked.release();
@@ -560,7 +544,7 @@
if (unacked != null)
{
// Mark message redelivered
- unacked.setRedelivered(true);
+ unacked.setRedelivered();
// Ensure message is released for redelivery
if (!unacked.isQueueDeleted())
@@ -655,7 +639,7 @@
// Without any details from the client about what has been processed we have to mark
// all messages in the unacked map as redelivered.
- message.setRedelivered(true);
+ message.setRedelivered();
Subscription sub = message.getDeliveredSubscription();
@@ -696,7 +680,7 @@
long deliveryTag = entry.getKey();
_unacknowledgedMessageMap.remove(deliveryTag);
- message.setRedelivered(true);
+ message.setRedelivered();
message.release();
}
@@ -931,16 +915,8 @@
public void deliverToClient(final Subscription sub, final QueueEntry entry, final long deliveryTag)
throws AMQException
{
- ServerMessage msg = entry.getMessage();
- if(msg instanceof AMQMessage)
- {
- getProtocolSession().getProtocolOutputConverter().writeDeliver(entry, getChannelId(),
- deliveryTag, sub.getConsumerTag());
- }
- else
- {
- //TODO - Convert 0-10 Message into 0-8/9 message
- }
+ getProtocolSession().getProtocolOutputConverter().writeDeliver(entry, getChannelId(),
+ deliveryTag, sub.getConsumerTag());
}
};
@@ -969,15 +945,7 @@
throws AMQException
{
-
-
- final AMQMessageHandle messageHandle = incomingMessage.getMessageHandle();
- final MessagePublishInfo messagePublishInfo = incomingMessage.getMessagePublishInfo();
- final ContentHeaderBody header = incomingMessage.getContentHeader();
-
-
-
- AMQMessage message = new AMQMessage(messageHandle, header, incomingMessage.getSize() ,messagePublishInfo);
+ AMQMessage message = new AMQMessage(incomingMessage.getStoredMessage());
message.setExpiration(incomingMessage.getExpiration());
message.setClientIdentifier(_session);
@@ -985,7 +953,6 @@
}
private boolean checkMessageUserId(ContentHeaderBody header)
- throws UnauthorizedAccessException
{
AMQShortString userID =
header.properties instanceof BasicContentHeaderProperties
@@ -996,7 +963,7 @@
}
- private class MessageDeliveryAction implements Transaction.Action
+ private class MessageDeliveryAction implements ServerTransaction.Action
{
private IncomingMessage _incommingMessage;
private ArrayList<AMQQueue> _destinationQueues;
@@ -1014,7 +981,7 @@
{
final boolean immediate = _incommingMessage.isImmediate();
- Transaction txn = null;
+ ServerTransaction txn = null;
for(AMQQueue queue : _destinationQueues)
{
@@ -1036,7 +1003,7 @@
AMQMessage message = (AMQMessage) entry.getMessage();
_session.getProtocolOutputConverter().writeReturn(message.getMessagePublishInfo(),
message.getContentHeaderBody(),
- message.getBodyFrameIterator(_session,_channelId),
+ message,
_channelId,
AMQConstant.NO_CONSUMERS.getCode(),
new AMQShortString("Immediate delivery is not possible."));
@@ -1069,7 +1036,7 @@
}
}
- private class MessageAcknowledgeAction implements Transaction.Action
+ private class MessageAcknowledgeAction implements ServerTransaction.Action
{
private final Collection<QueueEntry> _ackedMessages;
@@ -1120,7 +1087,7 @@
}
}
- private class WriteReturnAction implements Transaction.Action
+ private class WriteReturnAction implements ServerTransaction.Action
{
private final AMQConstant _errorCode;
private final IncomingMessage _message;
@@ -1141,7 +1108,7 @@
{
_session.getProtocolOutputConverter().writeReturn(_message.getMessagePublishInfo(),
_message.getContentHeader(),
- new BodyFrameIterator(_session,_channelId,_message),
+ _message,
_channelId,
_errorCode.getCode(),
new AMQShortString(_description));
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/ExtractResendAndRequeue.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/ExtractResendAndRequeue.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/ExtractResendAndRequeue.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/ExtractResendAndRequeue.java Tue Oct 20 16:23:01 2009
@@ -24,7 +24,7 @@
import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.store.TransactionLog;
-import org.apache.qpid.server.txn.Transaction;
+import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.txn.AutoCommitTransaction;
import org.apache.qpid.AMQException;
import org.apache.log4j.Logger;
@@ -56,8 +56,8 @@
public boolean callback(final long deliveryTag, QueueEntry message) throws AMQException
{
-
- message.setRedelivered(true);
+
+ message.setRedelivered();
final Subscription subscription = message.getDeliveredSubscription();
if (subscription != null)
{
@@ -103,14 +103,14 @@
private void dequeueEntry(final QueueEntry node)
{
- Transaction txn = new AutoCommitTransaction(_transactionLog);
+ ServerTransaction txn = new AutoCommitTransaction(_transactionLog);
dequeueEntry(node, txn);
}
- private void dequeueEntry(final QueueEntry node, Transaction txn)
+ private void dequeueEntry(final QueueEntry node, ServerTransaction txn)
{
txn.dequeue(node.getQueue(), node.getMessage(),
- new Transaction.Action()
+ new ServerTransaction.Action()
{
public void postCommit()
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/Main.java Tue Oct 20 16:23:01 2009
@@ -318,6 +318,9 @@
}
}
+ //TODO - HACK
+ port += 10;
+
String bindAddr = commandLine.getOptionValue("b");
if (bindAddr == null)
{
@@ -363,17 +366,19 @@
_brokerLogger.info("Qpid Broker Ready :" + QpidProperties.getReleaseVersion()
+ " build: " + QpidProperties.getBuildVersion());
- CurrentActor.get().message(BrokerMessages.BRK_1004());
- int port_0_10 = port + 1;
+ int port_0_10 = port - 10;
IApplicationRegistry appRegistry = ApplicationRegistry.getInstance();
final ConnectionDelegate delegate =
new org.apache.qpid.server.transport.ServerConnectionDelegate(appRegistry, bindAddress.getCanonicalHostName());
+
+
+
/*
NetworkDriver driver = new MINANetworkDriver();
- driver.bind(port, new InetAddress[]{bindAddress}, new ProtocolEngineFactory_0_10(delegate),
+ driver.bind(port_0_10, new InetAddress[]{bindAddress}, new ProtocolEngineFactory_0_10(delegate),
serverConfig.getNetworkConfiguration(), null);
ApplicationRegistry.getInstance().addAcceptor(new InetSocketAddress(bindAddress, port),
new QpidAcceptor(driver,"TCP"));
@@ -381,6 +386,9 @@
*/
+
+
+
// TODO - Fix to use a proper binding
@@ -401,6 +409,9 @@
org.apache.qpid.transport.network.io.IoAcceptor ioa = new org.apache.qpid.transport.network.io.IoAcceptor
("0.0.0.0", port_0_10, cb);
ioa.start();
+
+ CurrentActor.get().message(BrokerMessages.BRK_1004());
+
}
finally
{
@@ -532,13 +543,6 @@
{
LoggingManagementMBean blm = new LoggingManagementMBean(logConfigFile.getPath(),logWatchTime);
- try
- {
- blm.register();
- }
- catch (AMQException e)
- {
- throw new InitException("Unable to initialise the Logging Management MBean: ", e);
- }
+ blm.register();
}
}
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java Tue Oct 20 16:23:01 2009
@@ -14,8 +14,8 @@
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
- * under the License.
- *
+ * under the License.
+ *
*/
package org.apache.qpid.server.configuration;
@@ -124,7 +124,7 @@
}
catch (IllegalArgumentException e)
{
- // We're on something that doesn't handle SIGHUP, how sad, Windows.
+ // We're on something that doesn't handle SIGHUP, how sad, Windows.
}
}
@@ -221,7 +221,7 @@
String localeString = getConfig().getString(ADVANCED_LOCALE);
// Expecting locale of format langauge_country_variant
- // If the configuration does not have a defined locale use the JVM default
+ // If the configuration does not have a defined locale use the JVM default
if (localeString == null)
{
return Locale.getDefault();
@@ -625,48 +625,48 @@
{
return new NetworkDriverConfiguration()
{
-
+
public Integer getTrafficClass()
{
return null;
}
-
+
public Boolean getTcpNoDelay()
{
// Can't call parent getTcpNoDelay since it just calls this one
return getConfig().getBoolean("connector.tcpNoDelay", true);
}
-
+
public Integer getSoTimeout()
{
return null;
}
-
+
public Integer getSoLinger()
{
return null;
}
-
+
public Integer getSendBufferSize()
{
return getBufferWriteLimit();
}
-
+
public Boolean getReuseAddress()
{
return null;
}
-
+
public Integer getReceiveBufferSize()
{
return getBufferReadLimit();
}
-
+
public Boolean getOOBInline()
{
return null;
}
-
+
public Boolean getKeepAlive()
{
return null;
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java Tue Oct 20 16:23:01 2009
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -23,6 +23,7 @@
import javax.management.MalformedObjectNameException;
import javax.management.NotCompliantMBeanException;
import javax.management.ObjectName;
+import javax.management.JMException;
import javax.management.openmbean.OpenType;
import javax.management.openmbean.CompositeType;
import javax.management.openmbean.TabularType;
@@ -46,7 +47,7 @@
import org.apache.qpid.server.logging.messages.ExchangeMessages;
import org.apache.qpid.server.logging.subjects.ExchangeLogSubject;
import org.apache.qpid.server.logging.LogSubject;
-import org.apache.qpid.server.ExchangeReferrer;
+import org.apache.log4j.Logger;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -86,14 +87,14 @@
protected CompositeType _bindingDataType;
protected TabularType _bindinglistDataType;
protected TabularDataSupport _bindingList;
-
+
public ExchangeMBean() throws NotCompliantMBeanException
{
super(ManagedExchange.class, ManagedExchange.TYPE, ManagedExchange.VERSION);
}
protected void init() throws OpenDataException
- {
+ {
_bindingItemTypes = new OpenType[2];
_bindingItemTypes[0] = SimpleType.STRING;
_bindingItemTypes[1] = new ArrayType(1, SimpleType.STRING);
@@ -162,23 +163,33 @@
* called during initialisation (template method pattern).
* @return the MBean
*/
- protected abstract ExchangeMBean createMBean() throws AMQException;
+ protected abstract ExchangeMBean createMBean() throws JMException;
- public void initialise(VirtualHost host, AMQShortString name, boolean durable, int ticket, boolean autoDelete) throws AMQException
+ public void initialise(VirtualHost host, AMQShortString name, boolean durable, int ticket, boolean autoDelete)
+ throws AMQException
{
_virtualHost = host;
_name = name;
_durable = durable;
_autoDelete = autoDelete;
_ticket = ticket;
- _exchangeMbean = createMBean();
- _exchangeMbean.register();
+ try
+ {
+ _exchangeMbean = createMBean();
+ _exchangeMbean.register();
+ }
+ catch (JMException e)
+ {
+ getLogger().error(e);
+ }
_logSubject = new ExchangeLogSubject(this, this.getVirtualHost());
// Log Exchange creation
CurrentActor.get().message(ExchangeMessages.EXH_1001(String.valueOf(getType()), String.valueOf(name), durable));
}
+ public abstract Logger getLogger();
+
public boolean isDurable()
{
return _durable;
@@ -206,7 +217,7 @@
}
CurrentActor.get().message(_logSubject, ExchangeMessages.EXH_1002());
- }
+ }
public String toString()
{
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java Tue Oct 20 16:23:01 2009
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -23,7 +23,7 @@
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.server.protocol.ExchangeInitialiser;
+import org.apache.qpid.server.exchange.ExchangeInitialiser;
import org.apache.qpid.server.queue.IncomingMessage;
import org.apache.qpid.server.store.DurableConfigurationStore;
import org.apache.qpid.server.virtualhost.VirtualHost;
@@ -40,6 +40,7 @@
* Maps from exchange name to exchange instance
*/
private ConcurrentMap<AMQShortString, Exchange> _exchangeMap = new ConcurrentHashMap<AMQShortString, Exchange>();
+ private ConcurrentMap<String, Exchange> _exchangeMapStr = new ConcurrentHashMap<String, Exchange>();
private Exchange _defaultExchange;
private VirtualHost _host;
@@ -56,10 +57,7 @@
new ExchangeInitialiser().initialise(_host.getExchangeFactory(), this);
}
- public Exchange getExchange(String exchangeName)
- {
- return getExchange(new AMQShortString(exchangeName));
- }
+
public DurableConfigurationStore getDurableConfigurationStore()
{
@@ -69,6 +67,7 @@
public void registerExchange(Exchange exchange) throws AMQException
{
_exchangeMap.put(exchange.getName(), exchange);
+ _exchangeMapStr.put(exchange.getName().toString(), exchange);
if (exchange.isDurable())
{
getDurableConfigurationStore().createExchange(exchange);
@@ -94,6 +93,7 @@
{
// TODO: check inUse argument
Exchange e = _exchangeMap.remove(name);
+ _exchangeMapStr.remove(name.toString());
if (e != null)
{
if (e.isDurable())
@@ -126,6 +126,19 @@
}
+ public Exchange getExchange(String name)
+ {
+ if ((name == null) || name.length() == 0)
+ {
+ return getDefaultExchange();
+ }
+ else
+ {
+ return _exchangeMapStr.get(name);
+ }
+ }
+
+
/**
* Routes content through exchanges, delivering it to 1 or more queues.
* @param payload
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java Tue Oct 20 16:23:01 2009
@@ -149,17 +149,14 @@
}// End of MBean class
- protected ExchangeMBean createMBean() throws AMQException
+ protected ExchangeMBean createMBean() throws JMException
{
- try
- {
- return new DirectExchangeMBean();
- }
- catch (JMException ex)
- {
- _logger.error("Exception occured in creating the direct exchange mbean", ex);
- throw new AMQException("Exception occured in creating the direct exchange mbean", ex);
- }
+ return new DirectExchangeMBean();
+ }
+
+ public Logger getLogger()
+ {
+ return _logger;
}
public AMQShortString getType()
@@ -212,7 +209,8 @@
public ArrayList<AMQQueue> route(InboundMessage payload)
{
- final AMQShortString routingKey = payload.getRoutingKey() == null ? AMQShortString.EMPTY_STRING : new AMQShortString(payload.getRoutingKey());
+ final String routingKey = payload.getRoutingKey();
+
final ArrayList<AMQQueue> queues = (routingKey == null) ? null : _index.get(routingKey);
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java Tue Oct 20 16:23:01 2009
@@ -27,10 +27,9 @@
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.message.InboundMessage;
-import org.apache.qpid.server.ExchangeReferrer;
+import javax.management.JMException;
import java.util.ArrayList;
-import java.util.Map;
public interface Exchange extends ExchangeReferrer
{
@@ -38,7 +37,8 @@
AMQShortString getType();
- void initialise(VirtualHost host, AMQShortString name, boolean durable, int ticket, boolean autoDelete) throws AMQException;
+ void initialise(VirtualHost host, AMQShortString name, boolean durable, int ticket, boolean autoDelete)
+ throws AMQException, JMException;
boolean isDurable();
Copied: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeInitialiser.java (from r821930, qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ExchangeInitialiser.java)
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeInitialiser.java?p2=qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeInitialiser.java&p1=qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ExchangeInitialiser.java&r1=821930&r2=827724&rev=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ExchangeInitialiser.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeInitialiser.java Tue Oct 20 16:23:01 2009
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -18,15 +18,11 @@
* under the License.
*
*/
-package org.apache.qpid.server.protocol;
+package org.apache.qpid.server.exchange;
import org.apache.qpid.AMQException;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.server.exchange.Exchange;
-import org.apache.qpid.server.exchange.ExchangeFactory;
-import org.apache.qpid.server.exchange.ExchangeRegistry;
-import org.apache.qpid.server.exchange.ExchangeType;
public class ExchangeInitialiser
{
@@ -35,7 +31,7 @@
{
define (registry, factory, type.getDefaultExchangeName(), type.getName());
}
-
+
define(registry, factory, ExchangeDefaults.DEFAULT_EXCHANGE_NAME, ExchangeDefaults.DIRECT_EXCHANGE_CLASS);
registry.setDefaultExchange(registry.getExchange(ExchangeDefaults.DEFAULT_EXCHANGE_NAME));
}
Propchange: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeInitialiser.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeInitialiser.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Copied: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeReferrer.java (from r824494, qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/ExchangeReferrer.java)
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeReferrer.java?p2=qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeReferrer.java&p1=qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/ExchangeReferrer.java&r1=824494&r2=827724&rev=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/ExchangeReferrer.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeReferrer.java Tue Oct 20 16:23:01 2009
@@ -19,7 +19,7 @@
*
*/
-package org.apache.qpid.server;
+package org.apache.qpid.server.exchange;
public interface ExchangeReferrer
{
Propchange: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeReferrer.java
------------------------------------------------------------------------------
svn:executable = *
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java Tue Oct 20 16:23:01 2009
@@ -62,7 +62,7 @@
private final class FanoutExchangeMBean extends ExchangeMBean
{
private static final String BINDING_KEY_SUBSTITUTE = "*";
-
+
@MBeanConstructor("Creates an MBean for AMQ fanout exchange")
public FanoutExchangeMBean() throws JMException
{
@@ -75,7 +75,7 @@
{
_bindingList = new TabularDataSupport(_bindinglistDataType);
-
+
if(_queues.isEmpty())
{
return _bindingList;
@@ -88,7 +88,7 @@
String queueName = queue.getName().toString();
queueNames.add(queueName);
}
-
+
Object[] bindingItemValues = {BINDING_KEY_SUBSTITUTE, queueNames.toArray(new String[0])};
CompositeData bindingData = new CompositeDataSupport(_bindingDataType, COMPOSITE_ITEM_NAMES, bindingItemValues);
_bindingList.put(bindingData);
@@ -121,17 +121,14 @@
} // End of MBean class
- protected ExchangeMBean createMBean() throws AMQException
+ protected ExchangeMBean createMBean() throws JMException
{
- try
- {
- return new FanoutExchange.FanoutExchangeMBean();
- }
- catch (JMException ex)
- {
- _logger.error("Exception occured in creating the direct exchange mbean", ex);
- throw new AMQException("Exception occured in creating the direct exchange mbean", ex);
- }
+ return new FanoutExchange.FanoutExchangeMBean();
+ }
+
+ public Logger getLogger()
+ {
+ return _logger;
}
public static final ExchangeType<FanoutExchange> TYPE = new ExchangeType<FanoutExchange>()
@@ -202,7 +199,7 @@
public ArrayList<AMQQueue> route(InboundMessage payload)
{
-
+
if (_logger.isDebugEnabled())
{
_logger.debug("Publishing message to queue " + _queues);
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java Tue Oct 20 16:23:01 2009
@@ -83,6 +83,7 @@
*/
public class HeadersExchange extends AbstractExchange
{
+
private static final Logger _logger = Logger.getLogger(HeadersExchange.class);
public static final ExchangeType<HeadersExchange> TYPE = new ExchangeType<HeadersExchange>()
@@ -102,6 +103,7 @@
boolean autoDelete) throws AMQException
{
HeadersExchange exch = new HeadersExchange();
+
exch.initialise(host, name, durable, ticket, autoDelete);
return exch;
}
@@ -210,7 +212,7 @@
{
throw new JMException("Format for headers binding should be \"<attribute1>=<value1>,<attribute2>=<value2>\" ");
}
-
+
if(keyAndValue.length ==1)
{
//no value was given, only a key. Use an empty value
@@ -249,7 +251,7 @@
if(!_bindings.remove(new Registration(args == null ? null : new HeadersBinding(args), queue, routingKey)))
{
throw new AMQException(AMQConstant.NOT_FOUND, "Queue " + queue + " was not registered with exchange " + this.getName()
- + " with headers args " + args);
+ + " with headers args " + args);
}
}
@@ -320,17 +322,9 @@
return ((BasicContentHeaderProperties) contentHeaderFrame.properties).getHeaders();
}
- protected ExchangeMBean createMBean() throws AMQException
+ protected ExchangeMBean createMBean() throws JMException
{
- try
- {
- return new HeadersExchangeMBean();
- }
- catch (JMException ex)
- {
- _logger.error("Exception occured in creating the HeadersExchangeMBean", ex);
- throw new AMQException("Exception occured in creating the HeadersExchangeMBean", ex);
- }
+ return new HeadersExchangeMBean();
}
public Map<AMQShortString, List<AMQQueue>> getBindings()
@@ -338,6 +332,12 @@
return null;
}
+ public Logger getLogger()
+ {
+ return _logger;
+ }
+
+
private static class Registration
{
private final HeadersBinding binding;
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/Index.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/Index.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/Index.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/Index.java Tue Oct 20 16:23:01 2009
@@ -39,6 +39,9 @@
{
private ConcurrentMap<AMQShortString, ArrayList<AMQQueue>> _index
= new ConcurrentHashMap<AMQShortString, ArrayList<AMQQueue>>();
+ private ConcurrentMap<String, ArrayList<AMQQueue>> _stringIndex
+ = new ConcurrentHashMap<String, ArrayList<AMQQueue>>();
+
synchronized boolean add(AMQShortString key, AMQQueue queue)
{
@@ -51,8 +54,10 @@
{
queues = new ArrayList<AMQQueue>(queues);
}
+
//next call is atomic, so there is no race to create the list
_index.put(key, queues);
+ _stringIndex.put(key.toString(), queues);
if(queues.contains(queue))
{
@@ -64,6 +69,8 @@
}
}
+
+
synchronized boolean remove(AMQShortString key, AMQQueue queue)
{
ArrayList<AMQQueue> queues = _index.get(key);
@@ -76,10 +83,12 @@
if (queues.size() == 0)
{
_index.remove(key);
+ _stringIndex.remove(key.toString());
}
else
{
_index.put(key, queues);
+ _stringIndex.put(key.toString(), queues);
}
}
return removed;
@@ -92,6 +101,12 @@
return _index.get(key);
}
+ ArrayList<AMQQueue> get(String key)
+ {
+ return _stringIndex.get(key);
+ }
+
+
Map<AMQShortString, List<AMQQueue>> getBindingsMap()
{
return new HashMap<AMQShortString, List<AMQQueue>>(_index);
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/MessageRouter.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/MessageRouter.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/MessageRouter.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/MessageRouter.java Tue Oct 20 16:23:01 2009
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -21,7 +21,6 @@
package org.apache.qpid.server.exchange;
import org.apache.qpid.AMQException;
-import org.apache.qpid.server.queue.AMQMessage;
import org.apache.qpid.server.queue.IncomingMessage;
/**
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java Tue Oct 20 16:23:01 2009
@@ -228,7 +228,7 @@
_filteredQueues.remove(queue);
}
}
- else
+ else
{
filters.put(filter, instances - 1);
}
@@ -443,10 +443,10 @@
{
result.addUnfilteredQueue(queue);
}
- _parser.addBinding(routingKey, result);
+ _parser.addBinding(routingKey, result);
_topicExchangeResults.put(routingKey,result);
}
- else
+ else
{
if(argumentsContainSelector(args))
{
@@ -490,7 +490,7 @@
{
routingKey = AMQShortString.EMPTY_STRING;
}
-
+
AMQShortStringTokenizer routingTokens = routingKey.tokenize(TOPIC_SEPARATOR);
List<AMQShortString> subscriptionList = new ArrayList<AMQShortString>();
@@ -574,7 +574,7 @@
{
return false;
}
-
+
}
}
@@ -642,17 +642,14 @@
}
- protected ExchangeMBean createMBean() throws AMQException
+ protected ExchangeMBean createMBean() throws JMException
{
- try
- {
- return new TopicExchangeMBean();
- }
- catch (JMException ex)
- {
- _logger.error("Exception occured in creating the topic exchenge mbean", ex);
- throw new AMQException("Exception occured in creating the topic exchenge mbean", ex);
- }
+ return new TopicExchangeMBean();
+ }
+
+ public Logger getLogger()
+ {
+ return _logger;
}
private Collection<AMQQueue> getMatchedQueues(InboundMessage message, AMQShortString routingKey)
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/BooleanExpression.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/BooleanExpression.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/BooleanExpression.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/BooleanExpression.java Tue Oct 20 16:23:01 2009
@@ -20,8 +20,6 @@
// Based on like named file from r450141 of the Apache ActiveMQ project <http://www.activemq.org/site/home.html>
//
-import org.apache.qpid.AMQException;
-import org.apache.qpid.server.queue.AMQMessage;
import org.apache.qpid.server.queue.Filterable;
/**
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/ComparisonExpression.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/ComparisonExpression.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/ComparisonExpression.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/ComparisonExpression.java Tue Oct 20 16:23:01 2009
@@ -27,8 +27,6 @@
import java.util.List;
import java.util.regex.Pattern;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.server.queue.AMQMessage;
import org.apache.qpid.server.queue.Filterable;
/**
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/ConstantExpression.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/ConstantExpression.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/ConstantExpression.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/ConstantExpression.java Tue Oct 20 16:23:01 2009
@@ -25,8 +25,6 @@
import java.math.BigDecimal;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.server.queue.AMQMessage;
import org.apache.qpid.server.queue.Filterable;
/**
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/Expression.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/Expression.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/Expression.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/Expression.java Tue Oct 20 16:23:01 2009
@@ -20,8 +20,6 @@
// Based on like named file from r450141 of the Apache ActiveMQ project <http://www.activemq.org/site/home.html>
//
-import org.apache.qpid.AMQException;
-import org.apache.qpid.server.queue.AMQMessage;
import org.apache.qpid.server.queue.Filterable;
/**
Modified: qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/FilterManager.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/FilterManager.java?rev=827724&r1=827723&r2=827724&view=diff
==============================================================================
--- qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/FilterManager.java (original)
+++ qpid/branches/java-broker-0-10/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/FilterManager.java Tue Oct 20 16:23:01 2009
@@ -14,18 +14,16 @@
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
- * under the License.
+ * under the License.
+ *
*
- *
*/
package org.apache.qpid.server.filter;
//
// Based on like named file from r450141 of the Apache ActiveMQ project <http://www.activemq.org/site/home.html>
//
-import org.apache.qpid.server.queue.AMQMessage;
import org.apache.qpid.server.queue.Filterable;
-import org.apache.qpid.AMQException;
public interface FilterManager
{
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org