You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2015/03/12 16:41:48 UTC
svn commit: r1666224 [1/7] - in /qpid/trunk: ./ qpid/ qpid/cpp/src/
qpid/cpp/src/qpid/broker/ qpid/cpp/src/qpid/management/ qpid/cpp/src/tests/
qpid/java/
qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/
qpid/java/bdbstore/src/...
Author: kwall
Date: Thu Mar 12 15:41:46 2015
New Revision: 1666224
URL: http://svn.apache.org/r1666224
Log:
QPID-6429, QPID-6262, QPID-5818: [Java Broker] Utilise NIO, service connections using a thread pool, AMQP model mutating actions should use task executors
Work of Rob Godfrey and myself.
Added:
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerMessageInstancePair.java
- copied unchanged from r1666219, qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerMessageInstancePair.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/CloseFuture.java
- copied unchanged from r1666219, qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/CloseFuture.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/ServerProtocolEngine.java
- copied unchanged from r1666219, qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/ServerProtocolEngine.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java
- copied unchanged from r1666219, qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java
- copied unchanged from r1666219, qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java
- copied unchanged from r1666219, qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/util/FutureResult.java
- copied unchanged from r1666219, qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/util/FutureResult.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostUnavailableException.java
- copied unchanged from r1666219, qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostUnavailableException.java
qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerAssembler.java
- copied unchanged from r1666219, qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerAssembler.java
qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerDisassembler.java
- copied unchanged from r1666219, qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerDisassembler.java
qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerEncoder.java
- copied unchanged from r1666219, qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerEncoder.java
qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageOnlyCreditManager.java
- copied unchanged from r1666219, qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageOnlyCreditManager.java
qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/NoAckCreditManager.java
- copied unchanged from r1666219, qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/NoAckCreditManager.java
qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/Pre0_10CreditManager.java
- copied unchanged from r1666219, qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/Pre0_10CreditManager.java
qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/LimitlessCreditManager.java
- copied unchanged from r1666219, qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/LimitlessCreditManager.java
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ByteBufferReceiver.java
- copied unchanged from r1666219, qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/ByteBufferReceiver.java
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ByteBufferSender.java
- copied unchanged from r1666219, qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/ByteBufferSender.java
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/NetworkEventReceiver.java
- copied unchanged from r1666219, qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/NetworkEventReceiver.java
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ProtocolEventReceiver.java
- copied unchanged from r1666219, qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/ProtocolEventReceiver.java
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ProtocolEventSender.java
- copied unchanged from r1666219, qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/ProtocolEventSender.java
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/TransportEncryption.java
- copied unchanged from r1666219, qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/TransportEncryption.java
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/AbstractNetworkTransport.java
- copied unchanged from r1666219, qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/AbstractNetworkTransport.java
Removed:
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/flow/BytesOnlyCreditManager.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/flow/LimitlessCreditManager.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/flow/MessageAndBytesCreditManager.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/flow/MessageOnlyCreditManager.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/flow/Pre0_10CreditManager.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StoreFuture.java
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/protocol/ServerProtocolEngine.java
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Receiver.java
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Sender.java
Modified:
qpid/trunk/ (props changed)
qpid/trunk/qpid/ (props changed)
qpid/trunk/qpid/cpp/src/ (props changed)
qpid/trunk/qpid/cpp/src/CMakeLists.txt (props changed)
qpid/trunk/qpid/cpp/src/qpid/broker/ (props changed)
qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp (props changed)
qpid/trunk/qpid/cpp/src/tests/ (props changed)
qpid/trunk/qpid/java/ (props changed)
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CoalescingCommiter.java
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CommitThreadWrapper.java
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/Committer.java
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHARemoteReplicationNodeImpl.java
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java
qpid/trunk/qpid/java/broker-core/pom.xml
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/Broker.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/binding/BindingImpl.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutor.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutorImpl.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/connection/ConnectionPrincipal.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerImpl.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/ExchangeImpl.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/flow/FlowCreditManager.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObjectTypeFactory.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractSystemConfig.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Binding.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObject.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectFactory.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectFactoryImpl.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectTypeRegistry.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Connection.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Port.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileBasedGroupProviderImpl.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileSystemPreferencesProviderImpl.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AbstractPort.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPortImpl.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/PortFactory.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/ConfiguredObjectTypeFactory.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/ProtocolEngineCreator.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueFactory.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/FileKeyStoreImpl.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/FileTrustStoreImpl.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/NonJavaKeyStoreImpl.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/NonJavaTrustStoreImpl.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/SecurityManager.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/AbstractAuthenticationManager.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/ConfigModelPasswordManagingAuthenticationProvider.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/ManagedUser.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/PrincipalDatabaseAuthenticationManager.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/group/GroupImpl.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/group/GroupMemberImpl.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/group/GroupProviderImpl.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/Transaction.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/TCPandSSLTransport.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractStandardVirtualHostNode.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractVirtualHostNode.java
qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHostNodeImpl.java
qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/updater/CurrentThreadTaskExecutor.java
qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java
qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java
qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/logging/actors/BaseConnectionActorTestCase.java
qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java
qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/port/AmqpPortImplTest.java
qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/testmodels/lifecycle/TestConfiguredObject.java
qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java
qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/AsyncAutoCommitTransactionTest.java
qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/MockStoreTransaction.java
qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/AbstractVirtualHostTest.java
qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhostnode/AbstractStandardVirtualHostNodeTest.java
qpid/trunk/qpid/java/broker-plugins/access-control/src/main/java/org/apache/qpid/server/security/access/plugins/ACLFileAccessControlProviderImpl.java
qpid/trunk/qpid/java/broker-plugins/access-control/src/test/java/org/apache/qpid/server/security/access/plugins/ACLFileAccessControlProviderFactoryTest.java
qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/CreditCreditManager.java
qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngineCreator_0_10.java
qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java
qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/WindowCreditManager.java
qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/WindowCreditManagerTest.java
qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/pom.xml
qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolEngineCreator_0_8.java
qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolEngineCreator_0_9.java
qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolEngineCreator_0_9_1.java
qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolOutputConverterImpl.java
qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java
qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java
qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngineCreator_1_0_0_SASL.java
qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java
qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
qpid/trunk/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericAbstractJDBCMessageStore.java
qpid/trunk/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java
qpid/trunk/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagement.java
qpid/trunk/qpid/java/broker-plugins/management-http/src/test/java/org/apache/qpid/server/management/plugin/HttpManagementTest.java
qpid/trunk/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/JMXManagementPluginImpl.java
qpid/trunk/qpid/java/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionRedirectMethodHandler.java
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java
qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/client/transport/MockSender.java
qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/client/transport/TestNetworkConnection.java
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlock.java
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/CompositeAMQDataBlock.java
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Binding.java
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/AbstractEncoder.java
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/BBEncoder.java
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/Encoder.java
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Assembler.java
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/ConnectionBinding.java
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/IncomingNetworkTransport.java
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/InputHandler.java
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/NetworkConnection.java
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/OutgoingNetworkTransport.java
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IdleTimeoutTicker.java
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkConnection.java
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayer.java
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayerFactory.java
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLReceiver.java
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLSender.java
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLBufferingSender.java
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLReceiver.java
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLSender.java
qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/transport/network/TransportTest.java
qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/transport/network/io/IdleTimeoutTickerTest.java
qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/transport/network/io/IoAcceptor.java
qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/transport/network/io/IoTransport.java
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/InternalBrokerHolder.java
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java (props changed)
qpid/trunk/qpid/java/systests/src/test/java/org/apache/qpid/client/AMQQueueDeferredOrderingTest.java
qpid/trunk/qpid/java/systests/src/test/java/org/apache/qpid/client/AsynchMessageListenerTest.java
qpid/trunk/qpid/java/systests/src/test/java/org/apache/qpid/client/ssl/SSLTest.java
qpid/trunk/qpid/java/systests/src/test/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactoryTest.java
qpid/trunk/qpid/java/systests/src/test/java/org/apache/qpid/server/store/VirtualHostMessageStoreTest.java
qpid/trunk/qpid/java/systests/src/test/java/org/apache/qpid/systest/management/jmx/ConnectionManagementTest.java
qpid/trunk/qpid/java/systests/src/test/java/org/apache/qpid/systest/rest/LogRecordsRestTest.java
qpid/trunk/qpid/java/systests/src/test/java/org/apache/qpid/test/client/QueueBrowserAutoAckTest.java
qpid/trunk/qpid/java/systests/src/test/java/org/apache/qpid/test/unit/client/protocol/AMQProtocolSessionTest.java
qpid/trunk/qpid/java/systests/src/test/java/org/apache/qpid/transport/MaxFrameSizeTest.java
qpid/trunk/qpid/java/test-profiles/CPPExcludes (props changed)
qpid/trunk/qpid/java/test-profiles/JavaExcludes (contents, props changed)
qpid/trunk/qpid/java/test-profiles/JavaTransientExcludes (contents, props changed)
qpid/trunk/qpid/java/test-profiles/test_resources/ (props changed)
qpid/trunk/qpid/java/test-profiles/test_resources/spawned-broker-log4j.xml (props changed)
qpid/trunk/qpid/python/ (props changed)
qpid/trunk/qpid/tools/src/java/ (props changed)
qpid/trunk/qpid/tools/src/java/qpid-qmf2-tools/src/main/java/org/apache/qpid/qmf2/tools/ (props changed)
Propchange: qpid/trunk/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Mar 12 15:41:46 2015
@@ -1,5 +1,6 @@
/qpid/branches/0.5.x-dev:892761,894875
/qpid/branches/0.6-release-windows-installer:926803
+/qpid/branches/QPID-6262-JavaBrokerNIO:1643238-1666219
/qpid/branches/java-broker-bdb-ha2:1576683-1583556
/qpid/branches/java-network-refactor:805429-825319
/qpid/branches/mcpierce-QPID-4719:1477004-1477093
Propchange: qpid/trunk/qpid/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Mar 12 15:41:46 2015
@@ -1,6 +1,7 @@
/qpid/branches/0.5.x-dev/qpid:892761,894875
/qpid/branches/0.6-release-windows-installer:926803
/qpid/branches/0.6-release-windows-installer/qpid:926803,927233
+/qpid/branches/QPID-6262-JavaBrokerNIO/qpid:1643238-1666219
/qpid/branches/java-broker-bdb-ha2/qpid:1576683-1583556
/qpid/branches/java-network-refactor/qpid:805429-825319
/qpid/branches/mcpierce-QPID-4719/qpid:1477004-1477093
Propchange: qpid/trunk/qpid/cpp/src/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Mar 12 15:41:46 2015
@@ -2,6 +2,7 @@
/qpid/branches/0.6-release-windows-installer/cpp/src:926803
/qpid/branches/0.6-release-windows-installer/qpid/cpp/src:926803,927233
/qpid/branches/QPID-2519/cpp/src:1072051-1079078
+/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src:1643238-1666219
/qpid/branches/java-broker-bdb-ha2/qpid/cpp/src:1576683-1583556
/qpid/branches/java-network-refactor/qpid/cpp/src:805429-825319
/qpid/branches/qpid-2935/qpid/cpp/src:1061302-1072333
Propchange: qpid/trunk/qpid/cpp/src/CMakeLists.txt
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Mar 12 15:41:46 2015
@@ -2,6 +2,7 @@
/qpid/branches/0.6-release-windows-installer/cpp/src/CMakeLists.txt:926803
/qpid/branches/0.6-release-windows-installer/qpid/cpp/src/CMakeLists.txt:926803,927233,932132
/qpid/branches/QPID-2519/cpp/src/CMakeLists.txt:1072051-1079078
+/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/CMakeLists.txt:1643238-1666219
/qpid/branches/java-broker-bdb-ha2/qpid/cpp/src/CMakeLists.txt:1576683-1583556
/qpid/branches/java-network-refactor/qpid/cpp/src/CMakeLists.txt:805429-825319
/qpid/branches/qpid-2393/qpid/cpp/src/CMakeLists.txt:1375790-1376954
Propchange: qpid/trunk/qpid/cpp/src/qpid/broker/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Mar 12 15:41:46 2015
@@ -3,6 +3,7 @@
/qpid/branches/0.6-release-windows-installer/qpid/cpp/src/qpid/broker:926803,927233
/qpid/branches/QPID-2519/cpp/src/qpid/broker:1072051-1079078
/qpid/branches/QPID-3799-acl/cpp/src/qpid/broker:1291401-1295616
+/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/broker:1643238-1666219
/qpid/branches/java-network-refactor/qpid/cpp/src/qpid/broker:805429-825319
/qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker:1061302-1072333
/qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker:1144319-1179855
Propchange: qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Mar 12 15:41:46 2015
@@ -1 +1,2 @@
+/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/management/ManagementAgent.cpp:1643238-1666219
/qpid/branches/qpid-2935/qpid/cpp/src/qpid/management/ManagementAgent.cpp:1061302-1072333
Propchange: qpid/trunk/qpid/cpp/src/tests/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Mar 12 15:41:46 2015
@@ -3,6 +3,7 @@
/qpid/branches/0.6-release-windows-installer/qpid/cpp/src/tests:926803,927233
/qpid/branches/QPID-2519/cpp/src/tests:1072051-1079078
/qpid/branches/QPID-3799-acl/cpp/src/tests:1291401-1295617
+/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/tests:1643238-1666219
/qpid/branches/java-broker-bdb-ha2/qpid/cpp/src/tests:1576683-1583556
/qpid/branches/java-network-refactor/qpid/cpp/src/tests:805429-825319
/qpid/branches/qpid-2935/qpid/cpp/src/tests:1061302-1072333
Propchange: qpid/trunk/qpid/java/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Mar 12 15:41:46 2015
@@ -1,6 +1,7 @@
/qpid/branches/0.5.x-dev:886720-886722
/qpid/branches/0.5.x-dev/qpid/java:886720-886722,887145,892761,894875,916304,916325,930288,931179
/qpid/branches/QPID-6125-ProtocolRefactoring/java:1628068-1632579
+/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java:1643238-1666219
/qpid/branches/java-broker-0-10/qpid/java:795950-829653
/qpid/branches/java-broker-amqp-1-0-management/java:1562456-1569102
/qpid/branches/java-broker-bdb-ha2/qpid/java:1576683-1583556
Modified: qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java?rev=1666224&r1=1666223&r2=1666224&view=diff
==============================================================================
--- qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java (original)
+++ qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java Thu Mar 12 15:41:46 2015
@@ -51,7 +51,7 @@ import org.apache.qpid.server.store.Even
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StorableMessageMetaData;
import org.apache.qpid.server.store.StoreException;
-import org.apache.qpid.server.store.StoreFuture;
+import org.apache.qpid.server.util.FutureResult;
import org.apache.qpid.server.store.StoredMessage;
import org.apache.qpid.server.store.TransactionLogResource;
import org.apache.qpid.server.store.Xid;
@@ -924,14 +924,14 @@ public abstract class AbstractBDBMessage
*
* @throws org.apache.qpid.server.store.StoreException If the operation fails for any reason.
*/
- private StoreFuture commitTranImpl(final Transaction tx, boolean syncCommit) throws StoreException
+ private FutureResult commitTranImpl(final Transaction tx, boolean syncCommit) throws StoreException
{
if (tx == null)
{
throw new StoreException("Fatal internal error: transactional is null at commitTran");
}
- StoreFuture result = getEnvironmentFacade().commit(tx, syncCommit);
+ FutureResult result = getEnvironmentFacade().commit(tx, syncCommit);
if (getLogger().isDebugEnabled())
{
@@ -1386,7 +1386,7 @@ public abstract class AbstractBDBMessage
}
}
- synchronized StoreFuture flushToStore()
+ synchronized FutureResult flushToStore()
{
if(!stored())
{
@@ -1407,7 +1407,7 @@ public abstract class AbstractBDBMessage
storedSizeChangeOccurred(getMetaData().getContentSize());
}
- return StoreFuture.IMMEDIATE_FUTURE;
+ return FutureResult.IMMEDIATE_FUTURE;
}
@Override
@@ -1526,14 +1526,14 @@ public abstract class AbstractBDBMessage
}
@Override
- public StoreFuture commitTranAsync() throws StoreException
+ public FutureResult commitTranAsync() throws StoreException
{
checkMessageStoreOpen();
doPreCommitActions();
AbstractBDBMessageStore.this.storedSizeChangeOccurred(_storeSizeIncrease);
- StoreFuture storeFuture = AbstractBDBMessageStore.this.commitTranImpl(_txn, false);
+ FutureResult futureResult = AbstractBDBMessageStore.this.commitTranImpl(_txn, false);
doPostCommitActions();
- return storeFuture;
+ return futureResult;
}
@Override
Modified: qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CoalescingCommiter.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CoalescingCommiter.java?rev=1666224&r1=1666223&r2=1666224&view=diff
==============================================================================
--- qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CoalescingCommiter.java (original)
+++ qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CoalescingCommiter.java Thu Mar 12 15:41:46 2015
@@ -22,6 +22,7 @@ package org.apache.qpid.server.store.ber
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import com.sleepycat.je.DatabaseException;
@@ -29,7 +30,7 @@ import com.sleepycat.je.Environment;
import com.sleepycat.je.Transaction;
import org.apache.log4j.Logger;
-import org.apache.qpid.server.store.StoreFuture;
+import org.apache.qpid.server.util.FutureResult;
public class CoalescingCommiter implements Committer
{
@@ -65,16 +66,16 @@ public class CoalescingCommiter implemen
}
@Override
- public StoreFuture commit(Transaction tx, boolean syncCommit)
+ public FutureResult commit(Transaction tx, boolean syncCommit)
{
- BDBCommitFuture commitFuture = new BDBCommitFuture(_commitThread, tx, syncCommit);
+ BDBCommitFutureResult commitFuture = new BDBCommitFutureResult(_commitThread, tx, syncCommit);
commitFuture.commit();
return commitFuture;
}
- private static final class BDBCommitFuture implements StoreFuture
+ private static final class BDBCommitFutureResult implements FutureResult
{
- private static final Logger LOGGER = Logger.getLogger(BDBCommitFuture.class);
+ private static final Logger LOGGER = Logger.getLogger(BDBCommitFutureResult.class);
private final CommitThread _commitThread;
private final Transaction _tx;
@@ -82,7 +83,7 @@ public class CoalescingCommiter implemen
private RuntimeException _databaseException;
private boolean _complete;
- public BDBCommitFuture(CommitThread commitThread, Transaction tx, boolean syncCommit)
+ public BDBCommitFutureResult(CommitThread commitThread, Transaction tx, boolean syncCommit)
{
_commitThread = commitThread;
_tx = tx;
@@ -162,13 +163,47 @@ public class CoalescingCommiter implemen
LOGGER.debug("waitForCompletion returning after " + duration + " ms for transaction " + _tx);
}
}
+
+ public synchronized void waitForCompletion(long timeout) throws TimeoutException
+ {
+ long startTime= System.currentTimeMillis();
+ long remaining = timeout;
+
+ while (!isComplete() && remaining > 0)
+ {
+ _commitThread.explicitNotify();
+ try
+ {
+ wait(remaining);
+ }
+ catch (InterruptedException e)
+ {
+ throw new RuntimeException(e);
+ }
+ if(!isComplete())
+ {
+ remaining = (startTime + timeout) - System.currentTimeMillis();
+ }
+ }
+
+ if(remaining < 0l)
+ {
+ throw new TimeoutException("commit did not occur within given timeout period: " + timeout);
+ }
+
+ if(LOGGER.isDebugEnabled())
+ {
+ long duration = System.currentTimeMillis() - startTime;
+ LOGGER.debug("waitForCompletion returning after " + duration + " ms for transaction " + _tx);
+ }
+ }
}
/**
- * Implements a thread which batches and commits a queue of {@link BDBCommitFuture} operations. The commit operations
+ * Implements a thread which batches and commits a queue of {@link org.apache.qpid.server.store.berkeleydb.CoalescingCommiter.BDBCommitFutureResult} operations. The commit operations
* themselves are responsible for adding themselves to the queue and waiting for the commit to happen before
* continuing, but it is the responsibility of this thread to tell the commit operations when they have been
- * completed by calling back on their {@link BDBCommitFuture#complete()} and {@link BDBCommitFuture#abort} methods.
+ * completed by calling back on their {@link org.apache.qpid.server.store.berkeleydb.CoalescingCommiter.BDBCommitFutureResult#complete()} and {@link org.apache.qpid.server.store.berkeleydb.CoalescingCommiter.BDBCommitFutureResult#abort} methods.
*
* <p/><table id="crc"><caption>CRC Card</caption> <tr><th> Responsibilities <th> Collaborations </table>
*/
@@ -177,7 +212,7 @@ public class CoalescingCommiter implemen
private static final Logger LOGGER = Logger.getLogger(CommitThread.class);
private final AtomicBoolean _stopped = new AtomicBoolean(false);
- private final Queue<BDBCommitFuture> _jobQueue = new ConcurrentLinkedQueue<BDBCommitFuture>();
+ private final Queue<BDBCommitFutureResult> _jobQueue = new ConcurrentLinkedQueue<BDBCommitFutureResult>();
private final Object _lock = new Object();
private final EnvironmentFacade _environmentFacade;
@@ -244,7 +279,7 @@ public class CoalescingCommiter implemen
for(int i = 0; i < size; i++)
{
- BDBCommitFuture commit = _jobQueue.poll();
+ BDBCommitFutureResult commit = _jobQueue.poll();
if (commit == null)
{
break;
@@ -261,7 +296,7 @@ public class CoalescingCommiter implemen
for(int i = 0; i < size; i++)
{
- BDBCommitFuture commit = _jobQueue.poll();
+ BDBCommitFutureResult commit = _jobQueue.poll();
if (commit == null)
{
break;
@@ -290,7 +325,7 @@ public class CoalescingCommiter implemen
return !_jobQueue.isEmpty();
}
- public void addJob(BDBCommitFuture commit, final boolean sync)
+ public void addJob(BDBCommitFutureResult commit, final boolean sync)
{
if (_stopped.get())
{
@@ -313,7 +348,7 @@ public class CoalescingCommiter implemen
{
_stopped.set(true);
Environment environment = _environmentFacade.getEnvironment();
- BDBCommitFuture commit;
+ BDBCommitFutureResult commit;
if (environment != null && environment.isValid())
{
environment.flushLog(true);
Modified: qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CommitThreadWrapper.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CommitThreadWrapper.java?rev=1666224&r1=1666223&r2=1666224&view=diff
==============================================================================
--- qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CommitThreadWrapper.java (original)
+++ qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CommitThreadWrapper.java Thu Mar 12 15:41:46 2015
@@ -22,16 +22,17 @@ package org.apache.qpid.server.store.ber
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
-import org.apache.log4j.Logger;
-import org.apache.qpid.server.store.StoreException;
-import org.apache.qpid.server.store.StoreFuture;
-
import com.sleepycat.je.CheckpointConfig;
import com.sleepycat.je.DatabaseException;
import com.sleepycat.je.Environment;
import com.sleepycat.je.Transaction;
+import org.apache.log4j.Logger;
+
+import org.apache.qpid.server.store.StoreException;
+import org.apache.qpid.server.util.FutureResult;
public class CommitThreadWrapper
{
@@ -53,16 +54,16 @@ public class CommitThreadWrapper
_commitThread.join();
}
- public StoreFuture commit(Transaction tx, boolean syncCommit)
+ public FutureResult commit(Transaction tx, boolean syncCommit)
{
- BDBCommitFuture commitFuture = new BDBCommitFuture(_commitThread, tx, syncCommit);
+ BDBCommitFutureResult commitFuture = new BDBCommitFutureResult(_commitThread, tx, syncCommit);
commitFuture.commit();
return commitFuture;
}
- private static final class BDBCommitFuture implements StoreFuture
+ private static final class BDBCommitFutureResult implements FutureResult
{
- private static final Logger LOGGER = Logger.getLogger(BDBCommitFuture.class);
+ private static final Logger LOGGER = Logger.getLogger(BDBCommitFutureResult.class);
private final CommitThread _commitThread;
private final Transaction _tx;
@@ -70,7 +71,7 @@ public class CommitThreadWrapper
private boolean _complete;
private boolean _syncCommit;
- public BDBCommitFuture(CommitThread commitThread, Transaction tx, boolean syncCommit)
+ public BDBCommitFutureResult(CommitThread commitThread, Transaction tx, boolean syncCommit)
{
_commitThread = commitThread;
_tx = tx;
@@ -150,13 +151,48 @@ public class CommitThreadWrapper
LOGGER.debug("waitForCompletion returning after " + duration + " ms for transaction " + _tx);
}
}
+
+ @Override
+ public void waitForCompletion(final long timeout) throws TimeoutException
+ {
+ long startTime = System.currentTimeMillis();
+ long remaining = timeout;
+
+ while (!isComplete() && remaining > 0)
+ {
+ _commitThread.explicitNotify();
+ try
+ {
+ wait(remaining);
+ }
+ catch (InterruptedException e)
+ {
+ throw new StoreException(e);
+ }
+ if(!isComplete())
+ {
+ remaining = (startTime + timeout) - System.currentTimeMillis();
+ }
+ }
+
+ if(remaining < 0)
+ {
+ throw new TimeoutException("Commit did not complete within required timeout: " + timeout);
+ }
+
+ if(LOGGER.isDebugEnabled())
+ {
+ long duration = System.currentTimeMillis() - startTime;
+ LOGGER.debug("waitForCompletion returning after " + duration + " ms for transaction " + _tx);
+ }
+ }
}
/**
- * Implements a thread which batches and commits a queue of {@link BDBCommitFuture} operations. The commit operations
+ * Implements a thread which batches and commits a queue of {@link org.apache.qpid.server.store.berkeleydb.CommitThreadWrapper.BDBCommitFutureResult} operations. The commit operations
* themselves are responsible for adding themselves to the queue and waiting for the commit to happen before
* continuing, but it is the responsibility of this thread to tell the commit operations when they have been
- * completed by calling back on their {@link BDBCommitFuture#complete()} and {@link BDBCommitFuture#abort} methods.
+ * completed by calling back on their {@link org.apache.qpid.server.store.berkeleydb.CommitThreadWrapper.BDBCommitFutureResult#complete()} and {@link org.apache.qpid.server.store.berkeleydb.CommitThreadWrapper.BDBCommitFutureResult#abort} methods.
*
* <p/><table id="crc"><caption>CRC Card</caption> <tr><th> Responsibilities <th> Collaborations </table>
*/
@@ -165,7 +201,7 @@ public class CommitThreadWrapper
private static final Logger LOGGER = Logger.getLogger(CommitThread.class);
private final AtomicBoolean _stopped = new AtomicBoolean(false);
- private final Queue<BDBCommitFuture> _jobQueue = new ConcurrentLinkedQueue<BDBCommitFuture>();
+ private final Queue<BDBCommitFutureResult> _jobQueue = new ConcurrentLinkedQueue<BDBCommitFutureResult>();
private final CheckpointConfig _config = new CheckpointConfig();
private final Object _lock = new Object();
private Environment _environment;
@@ -230,7 +266,7 @@ public class CommitThreadWrapper
for(int i = 0; i < size; i++)
{
- BDBCommitFuture commit = _jobQueue.poll();
+ BDBCommitFutureResult commit = _jobQueue.poll();
commit.complete();
}
@@ -243,7 +279,7 @@ public class CommitThreadWrapper
for(int i = 0; i < size; i++)
{
- BDBCommitFuture commit = _jobQueue.poll();
+ BDBCommitFutureResult commit = _jobQueue.poll();
commit.abort(e);
}
}
@@ -268,7 +304,7 @@ public class CommitThreadWrapper
return !_jobQueue.isEmpty();
}
- public void addJob(BDBCommitFuture commit, final boolean sync)
+ public void addJob(BDBCommitFutureResult commit, final boolean sync)
{
_jobQueue.add(commit);
Modified: qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/Committer.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/Committer.java?rev=1666224&r1=1666223&r2=1666224&view=diff
==============================================================================
--- qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/Committer.java (original)
+++ qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/Committer.java Thu Mar 12 15:41:46 2015
@@ -20,15 +20,15 @@
*/
package org.apache.qpid.server.store.berkeleydb;
-import org.apache.qpid.server.store.StoreFuture;
-
import com.sleepycat.je.Transaction;
+import org.apache.qpid.server.util.FutureResult;
+
public interface Committer
{
void start();
- StoreFuture commit(Transaction tx, boolean syncCommit);
+ FutureResult commit(Transaction tx, boolean syncCommit);
void stop();
-}
\ No newline at end of file
+}
Modified: qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java?rev=1666224&r1=1666223&r2=1666224&view=diff
==============================================================================
--- qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java (original)
+++ qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java Thu Mar 12 15:41:46 2015
@@ -27,14 +27,13 @@ import java.util.Map;
import com.sleepycat.je.Database;
import com.sleepycat.je.DatabaseConfig;
import com.sleepycat.je.DatabaseEntry;
-import com.sleepycat.je.DatabaseException;
import com.sleepycat.je.Environment;
import com.sleepycat.je.EnvironmentConfig;
import com.sleepycat.je.Sequence;
import com.sleepycat.je.SequenceConfig;
import com.sleepycat.je.Transaction;
-import org.apache.qpid.server.store.StoreFuture;
+import org.apache.qpid.server.util.FutureResult;
public interface EnvironmentFacade
{
@@ -55,7 +54,7 @@ public interface EnvironmentFacade
Transaction beginTransaction();
- StoreFuture commit(com.sleepycat.je.Transaction tx, boolean sync);
+ FutureResult commit(com.sleepycat.je.Transaction tx, boolean sync);
RuntimeException handleDatabaseException(String contextMessage, RuntimeException e);
Modified: qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java?rev=1666224&r1=1666223&r2=1666224&view=diff
==============================================================================
--- qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java (original)
+++ qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java Thu Mar 12 15:41:46 2015
@@ -38,7 +38,7 @@ import com.sleepycat.je.Transaction;
import org.apache.log4j.Logger;
import org.apache.qpid.server.store.StoreException;
-import org.apache.qpid.server.store.StoreFuture;
+import org.apache.qpid.server.util.FutureResult;
import org.apache.qpid.server.store.berkeleydb.logging.Log4jLoggingHandler;
public class StandardEnvironmentFacade implements EnvironmentFacade
@@ -127,7 +127,7 @@ public class StandardEnvironmentFacade i
}
@Override
- public StoreFuture commit(com.sleepycat.je.Transaction tx, boolean syncCommit)
+ public FutureResult commit(com.sleepycat.je.Transaction tx, boolean syncCommit)
{
try
{
Modified: qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java?rev=1666224&r1=1666223&r2=1666224&view=diff
==============================================================================
--- qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java (original)
+++ qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java Thu Mar 12 15:41:46 2015
@@ -73,7 +73,7 @@ import org.codehaus.jackson.map.ObjectMa
import org.apache.qpid.server.configuration.IllegalConfigurationException;
import org.apache.qpid.server.store.StoreException;
-import org.apache.qpid.server.store.StoreFuture;
+import org.apache.qpid.server.util.FutureResult;
import org.apache.qpid.server.store.berkeleydb.BDBUtils;
import org.apache.qpid.server.store.berkeleydb.CoalescingCommiter;
import org.apache.qpid.server.store.berkeleydb.EnvHomeRegistry;
@@ -163,6 +163,8 @@ public class ReplicatedEnvironmentFacade
* with NO_SYN durability in case if such Node crushes.
*/
put(ReplicationConfig.LOG_FLUSH_TASK_INTERVAL, "1 min");
+
+ put(ReplicationConfig.CONSISTENCY_POLICY, "TimeConsistencyPolicy(1 s,30 s)");
}});
public static final String PERMITTED_NODE_LIST = "permittedNodes";
@@ -265,7 +267,7 @@ public class ReplicatedEnvironmentFacade
}
@Override
- public StoreFuture commit(final Transaction tx, boolean syncCommit)
+ public FutureResult commit(final Transaction tx, boolean syncCommit)
{
try
{
@@ -283,7 +285,7 @@ public class ReplicatedEnvironmentFacade
{
return _coalescingCommiter.commit(tx, syncCommit);
}
- return StoreFuture.IMMEDIATE_FUTURE;
+ return FutureResult.IMMEDIATE_FUTURE;
}
@Override
Modified: qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHARemoteReplicationNodeImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHARemoteReplicationNodeImpl.java?rev=1666224&r1=1666223&r2=1666224&view=diff
==============================================================================
--- qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHARemoteReplicationNodeImpl.java (original)
+++ qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHARemoteReplicationNodeImpl.java Thu Mar 12 15:41:46 2015
@@ -24,9 +24,11 @@ package org.apache.qpid.server.virtualho
import java.util.Map;
import java.util.Set;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
import com.sleepycat.je.rep.MasterStateException;
-
import org.apache.log4j.Logger;
+
import org.apache.qpid.server.configuration.IllegalConfigurationException;
import org.apache.qpid.server.logging.EventLogger;
import org.apache.qpid.server.logging.messages.HighAvailabilityMessages;
@@ -126,7 +128,7 @@ public class BDBHARemoteReplicationNodeI
}
@StateTransition(currentState = {State.ACTIVE, State.UNAVAILABLE}, desiredState = State.DELETED)
- private void doDelete()
+ private ListenableFuture<Void> doDelete()
{
String nodeName = getName();
@@ -146,6 +148,8 @@ public class BDBHARemoteReplicationNodeI
{
throw new IllegalStateTransitionException("Unexpected exception on node '" + nodeName + "' deletion", e);
}
+
+ return Futures.immediateFuture(null);
}
protected void afterSetRole()
Modified: qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java?rev=1666224&r1=1666223&r2=1666224&view=diff
==============================================================================
--- qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java (original)
+++ qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java Thu Mar 12 15:41:46 2015
@@ -42,6 +42,10 @@ import java.util.concurrent.atomic.Atomi
import javax.security.auth.Subject;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
import com.sleepycat.je.DatabaseException;
import com.sleepycat.je.LogWriteException;
import com.sleepycat.je.rep.NodeState;
@@ -318,7 +322,7 @@ public class BDBHAVirtualHostNodeImpl ex
}
@Override
- protected void activate()
+ protected ListenableFuture<Void> activate()
{
if (LOGGER.isDebugEnabled())
{
@@ -352,6 +356,7 @@ public class BDBHAVirtualHostNodeImpl ex
{
getEventLogger().message(getGroupLogSubject(), HighAvailabilityMessages.INTRUDER_DETECTED(node.getName(), nodeAddress));
shutdownOnIntruder(nodeAddress);
+
throw new IllegalStateException("Intruder node detected: " + nodeAddress);
}
}
@@ -367,24 +372,49 @@ public class BDBHAVirtualHostNodeImpl ex
environmentFacade.setReplicationGroupListener(new RemoteNodesDiscoverer());
environmentFacade.setPermittedNodes(_permittedNodes);
}
+
+ return Futures.immediateFuture(null);
}
@StateTransition( currentState = { State.UNINITIALIZED, State.ACTIVE, State.ERRORED }, desiredState = State.STOPPED )
- protected void doStop()
+ protected ListenableFuture<Void> doStop()
{
- try
- {
- super.doStop();
- }
- finally
+ final SettableFuture<Void> returnVal = SettableFuture.create();
+
+ ListenableFuture<Void> superFuture = super.doStop();
+ Futures.addCallback(superFuture, new FutureCallback<Void>()
{
- closeEnvironment();
+ @Override
+ public void onSuccess(final Void result)
+ {
+ doFinally();
+ }
- // closing the environment does not cause a state change. Adjust the role
- // so that our observers will see DETACHED rather than our previous role in the group.
- _lastRole.set(NodeRole.DETACHED);
- attributeSet(ROLE, _role, NodeRole.DETACHED);
- }
+ @Override
+ public void onFailure(final Throwable t)
+ {
+ doFinally();
+ }
+
+ private void doFinally()
+ {
+ try
+ {
+ closeEnvironment();
+
+ // closing the environment does not cause a state change. Adjust the role
+ // so that our observers will see DETACHED rather than our previous role in the group.
+ _lastRole.set(NodeRole.DETACHED);
+ attributeSet(ROLE, _role, NodeRole.DETACHED);
+ }
+ finally
+ {
+ returnVal.set(null);
+ }
+
+ }
+ });
+ return returnVal;
}
private void closeEnvironment()
@@ -397,43 +427,52 @@ public class BDBHAVirtualHostNodeImpl ex
}
@StateTransition( currentState = { State.ACTIVE, State.STOPPED, State.ERRORED}, desiredState = State.DELETED )
- protected void doDelete()
+ protected ListenableFuture<Void> doDelete()
{
- // get helpers before close. on close all children are closed and not available anymore
- Set<InetSocketAddress> helpers = getRemoteNodeAddresses();
- super.doDelete();
-
- if (getConfigurationStore() != null)
- {
- getEventLogger().message(getVirtualHostNodeLogSubject(), HighAvailabilityMessages.DELETED());
- }
- if (getState() == State.DELETED && !helpers.isEmpty())
+ // get helpers before close. on close all children are closed and not available anymore
+ final Set<InetSocketAddress> helpers = getRemoteNodeAddresses();
+ return doAfter(super.doDelete(),new Runnable()
{
- try
- {
- new ReplicationGroupAdmin(_groupName, helpers).removeMember(getName());
- }
- catch(DatabaseException e)
+ @Override
+ public void run()
{
- LOGGER.warn("The deletion of node " + this + " on remote nodes failed due to: " + e.getMessage()
- + ". To finish deletion a removal of the node from any of remote nodes (" + helpers + ") is required.");
+ if (getConfigurationStore() != null)
+ {
+ getEventLogger().message(getVirtualHostNodeLogSubject(), HighAvailabilityMessages.DELETED());
+ }
+
+ if (getState() == State.DELETED && !helpers.isEmpty())
+ {
+ try
+ {
+ new ReplicationGroupAdmin(_groupName, helpers).removeMember(getName());
+ }
+ catch(DatabaseException e)
+ {
+ LOGGER.warn("The deletion of node " + this + " on remote nodes failed due to: " + e.getMessage()
+ + ". To finish deletion a removal of the node from any of remote nodes (" + helpers + ") is required.");
+ }
+ }
+
}
- }
+ });
+
+
}
@Override
- protected void deleteVirtualHostIfExists()
+ protected ListenableFuture<Void> deleteVirtualHostIfExists()
{
ReplicatedEnvironmentFacade replicatedEnvironmentFacade = getReplicatedEnvironmentFacade();
if (replicatedEnvironmentFacade != null && replicatedEnvironmentFacade.isMaster()
&& replicatedEnvironmentFacade.getNumberOfElectableGroupMembers() == 1)
{
- super.deleteVirtualHostIfExists();
+ return super.deleteVirtualHostIfExists();
}
else
{
- closeVirtualHostIfExist();
+ return closeVirtualHostIfExist();
}
}
@@ -553,7 +592,7 @@ public class BDBHAVirtualHostNodeImpl ex
{
try
{
- closeVirtualHostIfExist();
+ closeVirtualHostIfExist().get();
getConfigurationStore().upgradeStoreStructure();
@@ -640,7 +679,7 @@ public class BDBHAVirtualHostNodeImpl ex
{
try
{
- closeVirtualHostIfExist();
+ closeVirtualHostIfExist().get();
Map<String, Object> hostAttributes = new HashMap<>();
hostAttributes.put(VirtualHost.MODEL_VERSION, BrokerModel.MODEL_VERSION);
@@ -654,13 +693,24 @@ public class BDBHAVirtualHostNodeImpl ex
}
}
- protected void closeVirtualHostIfExist()
+ protected ListenableFuture<Void> closeVirtualHostIfExist()
{
- VirtualHost<?,?,?> virtualHost = getVirtualHost();
+ final VirtualHost<?,?,?> virtualHost = getVirtualHost();
if (virtualHost!= null)
{
- virtualHost.close();
- childRemoved(virtualHost);
+ return doAfter(virtualHost.closeAsync(), new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ childRemoved(virtualHost);
+
+ }
+ });
+ }
+ else
+ {
+ return Futures.immediateFuture(null);
}
}
@@ -687,15 +737,19 @@ public class BDBHAVirtualHostNodeImpl ex
onReplica();
break;
case DETACHED:
- closeVirtualHostIfExist();
+ closeVirtualHostIfExist().get();
break;
case UNKNOWN:
- closeVirtualHostIfExist();
+ closeVirtualHostIfExist().get();
break;
default:
LOGGER.error("Unexpected state change: " + state);
}
}
+ catch (InterruptedException | ExecutionException e)
+ {
+ throw new ServerScopedRuntimeException(e);
+ }
finally
{
NodeRole newRole = NodeRole.fromJeState(state);
@@ -1137,7 +1191,7 @@ public class BDBHAVirtualHostNodeImpl ex
try
{
- close();
+ closeAsync();
}
finally
{
Modified: qpid/trunk/qpid/java/broker-core/pom.xml
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/pom.xml?rev=1666224&r1=1666223&r2=1666224&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/pom.xml (original)
+++ qpid/trunk/qpid/java/broker-core/pom.xml Thu Mar 12 15:41:46 2015
@@ -107,8 +107,14 @@
</exclusions>
</dependency>
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ <version>${guava-version}</version>
+ </dependency>
+
<!-- test dependencies -->
- <dependency>
+ <dependency>
<groupId>org.apache.qpid</groupId>
<artifactId>qpid-test-utils</artifactId>
<version>${project.version}</version>
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/Broker.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/Broker.java?rev=1666224&r1=1666223&r2=1666224&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/Broker.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/Broker.java Thu Mar 12 15:41:46 2015
@@ -29,9 +29,13 @@ import java.security.PrivilegedException
import java.util.HashSet;
import java.util.Properties;
import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import javax.security.auth.Subject;
+import com.google.common.util.concurrent.ListenableFuture;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.log4j.PropertyConfigurator;
@@ -106,11 +110,16 @@ public class Broker implements BrokerShu
{
if(_systemConfig != null)
{
- _systemConfig.close();
+ ListenableFuture<Void> closeResult = _systemConfig.closeAsync();
+ closeResult.get(30000l, TimeUnit.MILLISECONDS);
}
_taskExecutor.stop();
}
+ catch (TimeoutException | InterruptedException | ExecutionException e)
+ {
+ LOGGER.warn("Attempting to cleanly shutdown took too long, exiting immediately");
+ }
finally
{
if (_configuringOwnLogging)
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/binding/BindingImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/binding/BindingImpl.java?rev=1666224&r1=1666223&r2=1666224&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/binding/BindingImpl.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/binding/BindingImpl.java Thu Mar 12 15:41:46 2015
@@ -28,6 +28,9 @@ import java.util.concurrent.CopyOnWriteA
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+
import org.apache.qpid.server.configuration.IllegalConfigurationException;
import org.apache.qpid.server.configuration.updater.VoidTask;
import org.apache.qpid.server.exchange.AbstractExchange;
@@ -196,7 +199,7 @@ public class BindingImpl
}
@StateTransition(currentState = State.ACTIVE, desiredState = State.DELETED)
- private void doDelete()
+ private ListenableFuture<Void> doDelete()
{
if(_deleted.compareAndSet(false,true))
{
@@ -209,12 +212,14 @@ public class BindingImpl
deleted();
setState(State.DELETED);
+ return Futures.immediateFuture(null);
}
@StateTransition(currentState = State.UNINITIALIZED, desiredState = State.ACTIVE)
- private void activate()
+ private ListenableFuture<Void> activate()
{
setState(State.ACTIVE);
+ return Futures.immediateFuture(null);
}
public void addStateChangeListener(StateChangeListener<BindingImpl,State> listener)
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutor.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutor.java?rev=1666224&r1=1666223&r2=1666224&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutor.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutor.java Thu Mar 12 15:41:46 2015
@@ -21,6 +21,7 @@
package org.apache.qpid.server.configuration.updater;
import java.util.concurrent.CancellationException;
+import java.util.concurrent.Executor;
import java.util.concurrent.Future;
public interface TaskExecutor
@@ -43,4 +44,7 @@ public interface TaskExecutor
<T> Future<T> submit(Task<T> task) throws CancellationException;
+ boolean isTaskExecutorThread();
+
+ Executor getExecutor();
}
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutorImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutorImpl.java?rev=1666224&r1=1666223&r2=1666224&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutorImpl.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutorImpl.java Thu Mar 12 15:41:46 2015
@@ -26,6 +26,7 @@ import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
@@ -48,6 +49,7 @@ public class TaskExecutorImpl implements
private volatile Thread _taskThread;
private final AtomicBoolean _running = new AtomicBoolean();
private volatile ExecutorService _executor;
+ private final ImmediateIfSameThreadExecutor _wrappedExecutor = new ImmediateIfSameThreadExecutor();
@Override
@@ -67,7 +69,7 @@ public class TaskExecutorImpl implements
@Override
public Thread newThread(Runnable r)
{
- _taskThread = new Thread(r, TASK_EXECUTION_THREAD_NAME);
+ _taskThread = new TaskThread(r, TASK_EXECUTION_THREAD_NAME, TaskExecutorImpl.this);
return _taskThread;
}
});
@@ -277,7 +279,13 @@ public class TaskExecutorImpl implements
}
}
- private boolean isTaskExecutorThread()
+ @Override
+ public Executor getExecutor()
+ {
+ return _wrappedExecutor;
+ }
+
+ public boolean isTaskExecutorThread()
{
return Thread.currentThread() == _taskThread;
}
@@ -373,4 +381,41 @@ public class TaskExecutorImpl implements
return get();
}
}
+
+ private class ImmediateIfSameThreadExecutor implements Executor
+ {
+
+ @Override
+ public void execute(final Runnable command)
+ {
+ if(isTaskExecutorThread()
+ || (_executor == null && (Thread.currentThread() instanceof TaskThread
+ && ((TaskThread)Thread.currentThread()).getTaskExecutor() == TaskExecutorImpl.this)))
+ {
+ command.run();
+ }
+ else
+ {
+ _executor.execute(command);
+ }
+
+ }
+ }
+
+ private static class TaskThread extends Thread
+ {
+
+ private final TaskExecutorImpl _taskExecutor;
+
+ public TaskThread(final Runnable r, final String name, final TaskExecutorImpl taskExecutor)
+ {
+ super(r, name);
+ _taskExecutor = taskExecutor;
+ }
+
+ public TaskExecutorImpl getTaskExecutor()
+ {
+ return _taskExecutor;
+ }
+ }
}
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/connection/ConnectionPrincipal.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/connection/ConnectionPrincipal.java?rev=1666224&r1=1666223&r2=1666224&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/connection/ConnectionPrincipal.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/connection/ConnectionPrincipal.java Thu Mar 12 15:41:46 2015
@@ -20,11 +20,12 @@
*/
package org.apache.qpid.server.connection;
+import java.net.SocketAddress;
+
+import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.protocol.AMQConnectionModel;
import org.apache.qpid.server.security.auth.SocketConnectionPrincipal;
-import java.net.SocketAddress;
-
public class ConnectionPrincipal implements SocketConnectionPrincipal
{
private final AMQConnectionModel _connection;
@@ -51,6 +52,11 @@ public class ConnectionPrincipal impleme
return _connection;
}
+ public VirtualHost<?,?,?> getVirtualHost()
+ {
+ return _connection.getVirtualHost();
+ }
+
@Override
public boolean equals(final Object o)
{
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java?rev=1666224&r1=1666223&r2=1666224&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java Thu Mar 12 15:41:46 2015
@@ -74,7 +74,7 @@ public class ConnectionRegistry implemen
AMQConnectionModel connection = itr.next();
try
{
- connection.close(AMQConstant.CONNECTION_FORCED, replyText);
+ connection.closeAsync(AMQConstant.CONNECTION_FORCED, replyText);
}
catch (Exception e)
{
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java?rev=1666224&r1=1666223&r2=1666224&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java Thu Mar 12 15:41:46 2015
@@ -23,17 +23,21 @@ package org.apache.qpid.server.consumer;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
+import org.apache.log4j.Logger;
+
+import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.util.StateChangeListener;
public abstract class AbstractConsumerTarget implements ConsumerTarget
{
-
+ private static final Logger LOGGER = Logger.getLogger(AbstractConsumerTarget.class);
private final AtomicReference<State> _state;
private final Set<StateChangeListener<ConsumerTarget, State>> _stateChangeListeners = new
@@ -41,6 +45,7 @@ public abstract class AbstractConsumerTa
private final Lock _stateChangeLock = new ReentrantLock();
private final AtomicInteger _stateActivates = new AtomicInteger();
+ private ConcurrentLinkedQueue<ConsumerMessageInstancePair> _queue = new ConcurrentLinkedQueue();
protected AbstractConsumerTarget(final State initialState)
@@ -48,6 +53,26 @@ public abstract class AbstractConsumerTa
_state = new AtomicReference<State>(initialState);
}
+ @Override
+ public void processPending()
+ {
+ while(hasMessagesToSend())
+ {
+ sendNextMessage();
+ }
+
+ processClosed();
+ }
+
+ protected abstract void processClosed();
+
+ @Override
+ public final boolean isSuspended()
+ {
+ return getSessionModel().getConnectionModel().isMessageAssignmentSuspended() || doIsSuspended();
+ }
+
+ protected abstract boolean doIsSuspended();
public final State getState()
{
@@ -101,6 +126,7 @@ public abstract class AbstractConsumerTa
}
}
+ @Override
public final void notifyCurrentState()
{
@@ -136,4 +162,41 @@ public abstract class AbstractConsumerTa
_stateChangeLock.unlock();
}
+ @Override
+ public final long send(final ConsumerImpl consumer, MessageInstance entry, boolean batch)
+ {
+ _queue.add(new ConsumerMessageInstancePair(consumer, entry, batch));
+
+ getSessionModel().getConnectionModel().notifyWork();
+ return entry.getMessage().getSize();
+ }
+
+ protected abstract void doSend(final ConsumerImpl consumer, MessageInstance entry, boolean batch);
+
+ @Override
+ public boolean hasMessagesToSend()
+ {
+ return !_queue.isEmpty();
+ }
+
+ @Override
+ public void sendNextMessage()
+ {
+ ConsumerMessageInstancePair consumerMessage = _queue.peek();
+ if (consumerMessage != null)
+ {
+ _queue.poll();
+
+ ConsumerImpl consumer = consumerMessage.getConsumer();
+ MessageInstance entry = consumerMessage.getEntry();
+ boolean batch = consumerMessage.isBatch();
+ doSend(consumer, entry, batch);
+
+ if (consumer.acquires())
+ {
+ entry.unlockAcquisition();
+ }
+ }
+
+ }
}
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerImpl.java?rev=1666224&r1=1666223&r2=1666224&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerImpl.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerImpl.java Thu Mar 12 15:41:46 2015
@@ -22,6 +22,8 @@ package org.apache.qpid.server.consumer;
import java.util.concurrent.atomic.AtomicLong;
+import com.google.common.util.concurrent.ListenableFuture;
+
import org.apache.qpid.server.message.MessageSource;
import org.apache.qpid.server.protocol.AMQSessionModel;
@@ -31,6 +33,8 @@ public interface ConsumerImpl
void externalStateChange();
+ ConsumerTarget getTarget();
+
enum Option
{
ACQUIRES,
Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java?rev=1666224&r1=1666223&r2=1666224&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java Thu Mar 12 15:41:46 2015
@@ -33,6 +33,8 @@ public interface ConsumerTarget
void removeStateChangeListener(StateChangeListener<ConsumerTarget, State> listener);
+ void processPending();
+
enum State
{
ACTIVE, SUSPENDED, CLOSED
@@ -44,6 +46,8 @@ public interface ConsumerTarget
void consumerRemoved(ConsumerImpl sub);
+ void notifyCurrentState();
+
void addStateListener(StateChangeListener<ConsumerTarget, State> listener);
long getUnacknowledgedBytes();
@@ -54,6 +58,10 @@ public interface ConsumerTarget
long send(final ConsumerImpl consumer, MessageInstance entry, boolean batch);
+ boolean hasMessagesToSend();
+
+ void sendNextMessage();
+
void flushBatched();
void queueDeleted();
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org