You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2015/03/12 16:41:48 UTC

svn commit: r1666224 [1/7] - in /qpid/trunk: ./ qpid/ qpid/cpp/src/ qpid/cpp/src/qpid/broker/ qpid/cpp/src/qpid/management/ qpid/cpp/src/tests/ qpid/java/ qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ qpid/java/bdbstore/src/...

Author: kwall
Date: Thu Mar 12 15:41:46 2015
New Revision: 1666224

URL: http://svn.apache.org/r1666224
Log:
QPID-6429, QPID-6262, QPID-5818: [Java Broker] Utilise NIO, service connections using a thread pool, AMQP model mutating actions should use task executors

Work of Rob Godfrey and myself.


Added:
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerMessageInstancePair.java
      - copied unchanged from r1666219, qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerMessageInstancePair.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/CloseFuture.java
      - copied unchanged from r1666219, qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/CloseFuture.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/ServerProtocolEngine.java
      - copied unchanged from r1666219, qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/ServerProtocolEngine.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java
      - copied unchanged from r1666219, qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java
      - copied unchanged from r1666219, qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java
      - copied unchanged from r1666219, qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/util/FutureResult.java
      - copied unchanged from r1666219, qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/util/FutureResult.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostUnavailableException.java
      - copied unchanged from r1666219, qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostUnavailableException.java
    qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerAssembler.java
      - copied unchanged from r1666219, qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerAssembler.java
    qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerDisassembler.java
      - copied unchanged from r1666219, qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerDisassembler.java
    qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerEncoder.java
      - copied unchanged from r1666219, qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerEncoder.java
    qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageOnlyCreditManager.java
      - copied unchanged from r1666219, qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageOnlyCreditManager.java
    qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/NoAckCreditManager.java
      - copied unchanged from r1666219, qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/NoAckCreditManager.java
    qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/Pre0_10CreditManager.java
      - copied unchanged from r1666219, qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/Pre0_10CreditManager.java
    qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/LimitlessCreditManager.java
      - copied unchanged from r1666219, qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/LimitlessCreditManager.java
    qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ByteBufferReceiver.java
      - copied unchanged from r1666219, qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/ByteBufferReceiver.java
    qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ByteBufferSender.java
      - copied unchanged from r1666219, qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/ByteBufferSender.java
    qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/NetworkEventReceiver.java
      - copied unchanged from r1666219, qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/NetworkEventReceiver.java
    qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ProtocolEventReceiver.java
      - copied unchanged from r1666219, qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/ProtocolEventReceiver.java
    qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ProtocolEventSender.java
      - copied unchanged from r1666219, qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/ProtocolEventSender.java
    qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/TransportEncryption.java
      - copied unchanged from r1666219, qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/TransportEncryption.java
    qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/AbstractNetworkTransport.java
      - copied unchanged from r1666219, qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/AbstractNetworkTransport.java
Removed:
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/flow/BytesOnlyCreditManager.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/flow/LimitlessCreditManager.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/flow/MessageAndBytesCreditManager.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/flow/MessageOnlyCreditManager.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/flow/Pre0_10CreditManager.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StoreFuture.java
    qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/protocol/ServerProtocolEngine.java
    qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Receiver.java
    qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Sender.java
Modified:
    qpid/trunk/   (props changed)
    qpid/trunk/qpid/   (props changed)
    qpid/trunk/qpid/cpp/src/   (props changed)
    qpid/trunk/qpid/cpp/src/CMakeLists.txt   (props changed)
    qpid/trunk/qpid/cpp/src/qpid/broker/   (props changed)
    qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp   (props changed)
    qpid/trunk/qpid/cpp/src/tests/   (props changed)
    qpid/trunk/qpid/java/   (props changed)
    qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
    qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CoalescingCommiter.java
    qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CommitThreadWrapper.java
    qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/Committer.java
    qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java
    qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java
    qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
    qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHARemoteReplicationNodeImpl.java
    qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java
    qpid/trunk/qpid/java/broker-core/pom.xml
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/Broker.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/binding/BindingImpl.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutor.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutorImpl.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/connection/ConnectionPrincipal.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerImpl.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/ExchangeImpl.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/flow/FlowCreditManager.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObjectTypeFactory.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractSystemConfig.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Binding.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObject.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectFactory.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectFactoryImpl.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectTypeRegistry.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Connection.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Port.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/VirtualHost.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileBasedGroupProviderImpl.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileSystemPreferencesProviderImpl.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AbstractPort.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPortImpl.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/PortFactory.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/ConfiguredObjectTypeFactory.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/ProtocolEngineCreator.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactory.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueFactory.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/FileKeyStoreImpl.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/FileTrustStoreImpl.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/NonJavaKeyStoreImpl.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/NonJavaTrustStoreImpl.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/SecurityManager.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/AbstractAuthenticationManager.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/ConfigModelPasswordManagingAuthenticationProvider.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/ManagedUser.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/PrincipalDatabaseAuthenticationManager.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/group/GroupImpl.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/group/GroupMemberImpl.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/group/GroupProviderImpl.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/Transaction.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/TCPandSSLTransport.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractStandardVirtualHostNode.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractVirtualHostNode.java
    qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/RedirectingVirtualHostNodeImpl.java
    qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/updater/CurrentThreadTaskExecutor.java
    qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java
    qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java
    qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/logging/actors/BaseConnectionActorTestCase.java
    qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/VirtualHostTest.java
    qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/port/AmqpPortImplTest.java
    qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/model/testmodels/lifecycle/TestConfiguredObject.java
    qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java
    qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/AsyncAutoCommitTransactionTest.java
    qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/txn/MockStoreTransaction.java
    qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/AbstractVirtualHostTest.java
    qpid/trunk/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhostnode/AbstractStandardVirtualHostNodeTest.java
    qpid/trunk/qpid/java/broker-plugins/access-control/src/main/java/org/apache/qpid/server/security/access/plugins/ACLFileAccessControlProviderImpl.java
    qpid/trunk/qpid/java/broker-plugins/access-control/src/test/java/org/apache/qpid/server/security/access/plugins/ACLFileAccessControlProviderFactoryTest.java
    qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
    qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/CreditCreditManager.java
    qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngineCreator_0_10.java
    qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java
    qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
    qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
    qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
    qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
    qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/WindowCreditManager.java
    qpid/trunk/qpid/java/broker-plugins/amqp-0-10-protocol/src/test/java/org/apache/qpid/server/protocol/v0_10/WindowCreditManagerTest.java
    qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/pom.xml
    qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
    qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
    qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
    qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolEngineCreator_0_8.java
    qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolEngineCreator_0_9.java
    qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolEngineCreator_0_9_1.java
    qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolOutputConverterImpl.java
    qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java
    qpid/trunk/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java
    qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
    qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
    qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngineCreator_1_0_0_SASL.java
    qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java
    qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
    qpid/trunk/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
    qpid/trunk/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericAbstractJDBCMessageStore.java
    qpid/trunk/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java
    qpid/trunk/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagement.java
    qpid/trunk/qpid/java/broker-plugins/management-http/src/test/java/org/apache/qpid/server/management/plugin/HttpManagementTest.java
    qpid/trunk/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/JMXManagementPluginImpl.java
    qpid/trunk/qpid/java/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionRedirectMethodHandler.java
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
    qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/client/AMQSession_0_10Test.java
    qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/client/transport/MockSender.java
    qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/client/transport/TestNetworkConnection.java
    qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java
    qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlock.java
    qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java
    qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java
    qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java
    qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/CompositeAMQDataBlock.java
    qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java
    qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java
    qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java
    qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java
    qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java
    qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java
    qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Binding.java
    qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java
    qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java
    qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java
    qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/AbstractEncoder.java
    qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/BBEncoder.java
    qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/Encoder.java
    qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Assembler.java
    qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/ConnectionBinding.java
    qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java
    qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/IncomingNetworkTransport.java
    qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/InputHandler.java
    qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/NetworkConnection.java
    qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/OutgoingNetworkTransport.java
    qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IdleTimeoutTicker.java
    qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkConnection.java
    qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java
    qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoReceiver.java
    qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java
    qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayer.java
    qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/SecurityLayerFactory.java
    qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLReceiver.java
    qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLSender.java
    qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLBufferingSender.java
    qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLReceiver.java
    qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLSender.java
    qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/transport/network/TransportTest.java
    qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/transport/network/io/IdleTimeoutTickerTest.java
    qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/transport/network/io/IoAcceptor.java
    qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/transport/network/io/IoTransport.java
    qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/InternalBrokerHolder.java
    qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java   (props changed)
    qpid/trunk/qpid/java/systests/src/test/java/org/apache/qpid/client/AMQQueueDeferredOrderingTest.java
    qpid/trunk/qpid/java/systests/src/test/java/org/apache/qpid/client/AsynchMessageListenerTest.java
    qpid/trunk/qpid/java/systests/src/test/java/org/apache/qpid/client/ssl/SSLTest.java
    qpid/trunk/qpid/java/systests/src/test/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactoryTest.java
    qpid/trunk/qpid/java/systests/src/test/java/org/apache/qpid/server/store/VirtualHostMessageStoreTest.java
    qpid/trunk/qpid/java/systests/src/test/java/org/apache/qpid/systest/management/jmx/ConnectionManagementTest.java
    qpid/trunk/qpid/java/systests/src/test/java/org/apache/qpid/systest/rest/LogRecordsRestTest.java
    qpid/trunk/qpid/java/systests/src/test/java/org/apache/qpid/test/client/QueueBrowserAutoAckTest.java
    qpid/trunk/qpid/java/systests/src/test/java/org/apache/qpid/test/unit/client/protocol/AMQProtocolSessionTest.java
    qpid/trunk/qpid/java/systests/src/test/java/org/apache/qpid/transport/MaxFrameSizeTest.java
    qpid/trunk/qpid/java/test-profiles/CPPExcludes   (props changed)
    qpid/trunk/qpid/java/test-profiles/JavaExcludes   (contents, props changed)
    qpid/trunk/qpid/java/test-profiles/JavaTransientExcludes   (contents, props changed)
    qpid/trunk/qpid/java/test-profiles/test_resources/   (props changed)
    qpid/trunk/qpid/java/test-profiles/test_resources/spawned-broker-log4j.xml   (props changed)
    qpid/trunk/qpid/python/   (props changed)
    qpid/trunk/qpid/tools/src/java/   (props changed)
    qpid/trunk/qpid/tools/src/java/qpid-qmf2-tools/src/main/java/org/apache/qpid/qmf2/tools/   (props changed)

Propchange: qpid/trunk/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Mar 12 15:41:46 2015
@@ -1,5 +1,6 @@
 /qpid/branches/0.5.x-dev:892761,894875
 /qpid/branches/0.6-release-windows-installer:926803
+/qpid/branches/QPID-6262-JavaBrokerNIO:1643238-1666219
 /qpid/branches/java-broker-bdb-ha2:1576683-1583556
 /qpid/branches/java-network-refactor:805429-825319
 /qpid/branches/mcpierce-QPID-4719:1477004-1477093

Propchange: qpid/trunk/qpid/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Mar 12 15:41:46 2015
@@ -1,6 +1,7 @@
 /qpid/branches/0.5.x-dev/qpid:892761,894875
 /qpid/branches/0.6-release-windows-installer:926803
 /qpid/branches/0.6-release-windows-installer/qpid:926803,927233
+/qpid/branches/QPID-6262-JavaBrokerNIO/qpid:1643238-1666219
 /qpid/branches/java-broker-bdb-ha2/qpid:1576683-1583556
 /qpid/branches/java-network-refactor/qpid:805429-825319
 /qpid/branches/mcpierce-QPID-4719/qpid:1477004-1477093

Propchange: qpid/trunk/qpid/cpp/src/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Mar 12 15:41:46 2015
@@ -2,6 +2,7 @@
 /qpid/branches/0.6-release-windows-installer/cpp/src:926803
 /qpid/branches/0.6-release-windows-installer/qpid/cpp/src:926803,927233
 /qpid/branches/QPID-2519/cpp/src:1072051-1079078
+/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src:1643238-1666219
 /qpid/branches/java-broker-bdb-ha2/qpid/cpp/src:1576683-1583556
 /qpid/branches/java-network-refactor/qpid/cpp/src:805429-825319
 /qpid/branches/qpid-2935/qpid/cpp/src:1061302-1072333

Propchange: qpid/trunk/qpid/cpp/src/CMakeLists.txt
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Mar 12 15:41:46 2015
@@ -2,6 +2,7 @@
 /qpid/branches/0.6-release-windows-installer/cpp/src/CMakeLists.txt:926803
 /qpid/branches/0.6-release-windows-installer/qpid/cpp/src/CMakeLists.txt:926803,927233,932132
 /qpid/branches/QPID-2519/cpp/src/CMakeLists.txt:1072051-1079078
+/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/CMakeLists.txt:1643238-1666219
 /qpid/branches/java-broker-bdb-ha2/qpid/cpp/src/CMakeLists.txt:1576683-1583556
 /qpid/branches/java-network-refactor/qpid/cpp/src/CMakeLists.txt:805429-825319
 /qpid/branches/qpid-2393/qpid/cpp/src/CMakeLists.txt:1375790-1376954

Propchange: qpid/trunk/qpid/cpp/src/qpid/broker/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Mar 12 15:41:46 2015
@@ -3,6 +3,7 @@
 /qpid/branches/0.6-release-windows-installer/qpid/cpp/src/qpid/broker:926803,927233
 /qpid/branches/QPID-2519/cpp/src/qpid/broker:1072051-1079078
 /qpid/branches/QPID-3799-acl/cpp/src/qpid/broker:1291401-1295616
+/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/broker:1643238-1666219
 /qpid/branches/java-network-refactor/qpid/cpp/src/qpid/broker:805429-825319
 /qpid/branches/qpid-2935/qpid/cpp/src/qpid/broker:1061302-1072333
 /qpid/branches/qpid-3346/qpid/cpp/src/qpid/broker:1144319-1179855

Propchange: qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Mar 12 15:41:46 2015
@@ -1 +1,2 @@
+/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/qpid/management/ManagementAgent.cpp:1643238-1666219
 /qpid/branches/qpid-2935/qpid/cpp/src/qpid/management/ManagementAgent.cpp:1061302-1072333

Propchange: qpid/trunk/qpid/cpp/src/tests/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Mar 12 15:41:46 2015
@@ -3,6 +3,7 @@
 /qpid/branches/0.6-release-windows-installer/qpid/cpp/src/tests:926803,927233
 /qpid/branches/QPID-2519/cpp/src/tests:1072051-1079078
 /qpid/branches/QPID-3799-acl/cpp/src/tests:1291401-1295617
+/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/cpp/src/tests:1643238-1666219
 /qpid/branches/java-broker-bdb-ha2/qpid/cpp/src/tests:1576683-1583556
 /qpid/branches/java-network-refactor/qpid/cpp/src/tests:805429-825319
 /qpid/branches/qpid-2935/qpid/cpp/src/tests:1061302-1072333

Propchange: qpid/trunk/qpid/java/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Mar 12 15:41:46 2015
@@ -1,6 +1,7 @@
 /qpid/branches/0.5.x-dev:886720-886722
 /qpid/branches/0.5.x-dev/qpid/java:886720-886722,887145,892761,894875,916304,916325,930288,931179
 /qpid/branches/QPID-6125-ProtocolRefactoring/java:1628068-1632579
+/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java:1643238-1666219
 /qpid/branches/java-broker-0-10/qpid/java:795950-829653
 /qpid/branches/java-broker-amqp-1-0-management/java:1562456-1569102
 /qpid/branches/java-broker-bdb-ha2/qpid/java:1576683-1583556

Modified: qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java?rev=1666224&r1=1666223&r2=1666224&view=diff
==============================================================================
--- qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java (original)
+++ qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java Thu Mar 12 15:41:46 2015
@@ -51,7 +51,7 @@ import org.apache.qpid.server.store.Even
 import org.apache.qpid.server.store.MessageStore;
 import org.apache.qpid.server.store.StorableMessageMetaData;
 import org.apache.qpid.server.store.StoreException;
-import org.apache.qpid.server.store.StoreFuture;
+import org.apache.qpid.server.util.FutureResult;
 import org.apache.qpid.server.store.StoredMessage;
 import org.apache.qpid.server.store.TransactionLogResource;
 import org.apache.qpid.server.store.Xid;
@@ -924,14 +924,14 @@ public abstract class AbstractBDBMessage
      *
      * @throws org.apache.qpid.server.store.StoreException If the operation fails for any reason.
      */
-    private StoreFuture commitTranImpl(final Transaction tx, boolean syncCommit) throws StoreException
+    private FutureResult commitTranImpl(final Transaction tx, boolean syncCommit) throws StoreException
     {
         if (tx == null)
         {
             throw new StoreException("Fatal internal error: transactional is null at commitTran");
         }
 
-        StoreFuture result = getEnvironmentFacade().commit(tx, syncCommit);
+        FutureResult result = getEnvironmentFacade().commit(tx, syncCommit);
 
         if (getLogger().isDebugEnabled())
         {
@@ -1386,7 +1386,7 @@ public abstract class AbstractBDBMessage
             }
         }
 
-        synchronized StoreFuture flushToStore()
+        synchronized FutureResult flushToStore()
         {
             if(!stored())
             {
@@ -1407,7 +1407,7 @@ public abstract class AbstractBDBMessage
 
                 storedSizeChangeOccurred(getMetaData().getContentSize());
             }
-            return StoreFuture.IMMEDIATE_FUTURE;
+            return FutureResult.IMMEDIATE_FUTURE;
         }
 
         @Override
@@ -1526,14 +1526,14 @@ public abstract class AbstractBDBMessage
         }
 
         @Override
-        public StoreFuture commitTranAsync() throws StoreException
+        public FutureResult commitTranAsync() throws StoreException
         {
             checkMessageStoreOpen();
             doPreCommitActions();
             AbstractBDBMessageStore.this.storedSizeChangeOccurred(_storeSizeIncrease);
-            StoreFuture storeFuture = AbstractBDBMessageStore.this.commitTranImpl(_txn, false);
+            FutureResult futureResult = AbstractBDBMessageStore.this.commitTranImpl(_txn, false);
             doPostCommitActions();
-            return storeFuture;
+            return futureResult;
         }
 
         @Override

Modified: qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CoalescingCommiter.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CoalescingCommiter.java?rev=1666224&r1=1666223&r2=1666224&view=diff
==============================================================================
--- qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CoalescingCommiter.java (original)
+++ qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CoalescingCommiter.java Thu Mar 12 15:41:46 2015
@@ -22,6 +22,7 @@ package org.apache.qpid.server.store.ber
 
 import java.util.Queue;
 import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import com.sleepycat.je.DatabaseException;
@@ -29,7 +30,7 @@ import com.sleepycat.je.Environment;
 import com.sleepycat.je.Transaction;
 import org.apache.log4j.Logger;
 
-import org.apache.qpid.server.store.StoreFuture;
+import org.apache.qpid.server.util.FutureResult;
 
 public class CoalescingCommiter implements Committer
 {
@@ -65,16 +66,16 @@ public class CoalescingCommiter implemen
     }
 
     @Override
-    public StoreFuture commit(Transaction tx, boolean syncCommit)
+    public FutureResult commit(Transaction tx, boolean syncCommit)
     {
-        BDBCommitFuture commitFuture = new BDBCommitFuture(_commitThread, tx, syncCommit);
+        BDBCommitFutureResult commitFuture = new BDBCommitFutureResult(_commitThread, tx, syncCommit);
         commitFuture.commit();
         return commitFuture;
     }
 
-    private static final class BDBCommitFuture implements StoreFuture
+    private static final class BDBCommitFutureResult implements FutureResult
     {
-        private static final Logger LOGGER = Logger.getLogger(BDBCommitFuture.class);
+        private static final Logger LOGGER = Logger.getLogger(BDBCommitFutureResult.class);
 
         private final CommitThread _commitThread;
         private final Transaction _tx;
@@ -82,7 +83,7 @@ public class CoalescingCommiter implemen
         private RuntimeException _databaseException;
         private boolean _complete;
 
-        public BDBCommitFuture(CommitThread commitThread, Transaction tx, boolean syncCommit)
+        public BDBCommitFutureResult(CommitThread commitThread, Transaction tx, boolean syncCommit)
         {
             _commitThread = commitThread;
             _tx = tx;
@@ -162,13 +163,47 @@ public class CoalescingCommiter implemen
                 LOGGER.debug("waitForCompletion returning after " + duration + " ms for transaction " + _tx);
             }
         }
+
+        public synchronized void waitForCompletion(long timeout) throws TimeoutException
+        {
+            long startTime= System.currentTimeMillis();
+            long remaining = timeout;
+
+            while (!isComplete() && remaining > 0)
+            {
+                _commitThread.explicitNotify();
+                try
+                {
+                    wait(remaining);
+                }
+                catch (InterruptedException e)
+                {
+                    throw new RuntimeException(e);
+                }
+                if(!isComplete())
+                {
+                    remaining = (startTime + timeout) - System.currentTimeMillis();
+                }
+            }
+
+            if(remaining < 0l)
+            {
+                throw new TimeoutException("commit did not occur within given timeout period: " + timeout);
+            }
+
+            if(LOGGER.isDebugEnabled())
+            {
+                long duration = System.currentTimeMillis() - startTime;
+                LOGGER.debug("waitForCompletion returning after " + duration + " ms for transaction " + _tx);
+            }
+        }
     }
 
     /**
-     * Implements a thread which batches and commits a queue of {@link BDBCommitFuture} operations. The commit operations
+     * Implements a thread which batches and commits a queue of {@link org.apache.qpid.server.store.berkeleydb.CoalescingCommiter.BDBCommitFutureResult} operations. The commit operations
      * themselves are responsible for adding themselves to the queue and waiting for the commit to happen before
      * continuing, but it is the responsibility of this thread to tell the commit operations when they have been
-     * completed by calling back on their {@link BDBCommitFuture#complete()} and {@link BDBCommitFuture#abort} methods.
+     * completed by calling back on their {@link org.apache.qpid.server.store.berkeleydb.CoalescingCommiter.BDBCommitFutureResult#complete()} and {@link org.apache.qpid.server.store.berkeleydb.CoalescingCommiter.BDBCommitFutureResult#abort} methods.
      *
      * <p/><table id="crc"><caption>CRC Card</caption> <tr><th> Responsibilities <th> Collaborations </table>
      */
@@ -177,7 +212,7 @@ public class CoalescingCommiter implemen
         private static final Logger LOGGER = Logger.getLogger(CommitThread.class);
 
         private final AtomicBoolean _stopped = new AtomicBoolean(false);
-        private final Queue<BDBCommitFuture> _jobQueue = new ConcurrentLinkedQueue<BDBCommitFuture>();
+        private final Queue<BDBCommitFutureResult> _jobQueue = new ConcurrentLinkedQueue<BDBCommitFutureResult>();
         private final Object _lock = new Object();
         private final EnvironmentFacade _environmentFacade;
 
@@ -244,7 +279,7 @@ public class CoalescingCommiter implemen
 
                 for(int i = 0; i < size; i++)
                 {
-                    BDBCommitFuture commit = _jobQueue.poll();
+                    BDBCommitFutureResult commit = _jobQueue.poll();
                     if (commit == null)
                     {
                         break;
@@ -261,7 +296,7 @@ public class CoalescingCommiter implemen
 
                     for(int i = 0; i < size; i++)
                     {
-                        BDBCommitFuture commit = _jobQueue.poll();
+                        BDBCommitFutureResult commit = _jobQueue.poll();
                         if (commit == null)
                         {
                             break;
@@ -290,7 +325,7 @@ public class CoalescingCommiter implemen
             return !_jobQueue.isEmpty();
         }
 
-        public void addJob(BDBCommitFuture commit, final boolean sync)
+        public void addJob(BDBCommitFutureResult commit, final boolean sync)
         {
             if (_stopped.get())
             {
@@ -313,7 +348,7 @@ public class CoalescingCommiter implemen
             {
                 _stopped.set(true);
                 Environment environment = _environmentFacade.getEnvironment();
-                BDBCommitFuture commit;
+                BDBCommitFutureResult commit;
                 if (environment != null && environment.isValid())
                 {
                     environment.flushLog(true);

Modified: qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CommitThreadWrapper.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CommitThreadWrapper.java?rev=1666224&r1=1666223&r2=1666224&view=diff
==============================================================================
--- qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CommitThreadWrapper.java (original)
+++ qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/CommitThreadWrapper.java Thu Mar 12 15:41:46 2015
@@ -22,16 +22,17 @@ package org.apache.qpid.server.store.ber
 
 import java.util.Queue;
 import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
 
-import org.apache.log4j.Logger;
-import org.apache.qpid.server.store.StoreException;
-import org.apache.qpid.server.store.StoreFuture;
-
 import com.sleepycat.je.CheckpointConfig;
 import com.sleepycat.je.DatabaseException;
 import com.sleepycat.je.Environment;
 import com.sleepycat.je.Transaction;
+import org.apache.log4j.Logger;
+
+import org.apache.qpid.server.store.StoreException;
+import org.apache.qpid.server.util.FutureResult;
 
 public class CommitThreadWrapper
 {
@@ -53,16 +54,16 @@ public class CommitThreadWrapper
         _commitThread.join();
     }
 
-    public StoreFuture commit(Transaction tx, boolean syncCommit)
+    public FutureResult commit(Transaction tx, boolean syncCommit)
     {
-        BDBCommitFuture commitFuture = new BDBCommitFuture(_commitThread, tx, syncCommit);
+        BDBCommitFutureResult commitFuture = new BDBCommitFutureResult(_commitThread, tx, syncCommit);
         commitFuture.commit();
         return commitFuture;
     }
 
-    private static final class BDBCommitFuture implements StoreFuture
+    private static final class BDBCommitFutureResult implements FutureResult
     {
-        private static final Logger LOGGER = Logger.getLogger(BDBCommitFuture.class);
+        private static final Logger LOGGER = Logger.getLogger(BDBCommitFutureResult.class);
 
         private final CommitThread _commitThread;
         private final Transaction _tx;
@@ -70,7 +71,7 @@ public class CommitThreadWrapper
         private boolean _complete;
         private boolean _syncCommit;
 
-        public BDBCommitFuture(CommitThread commitThread, Transaction tx, boolean syncCommit)
+        public BDBCommitFutureResult(CommitThread commitThread, Transaction tx, boolean syncCommit)
         {
             _commitThread = commitThread;
             _tx = tx;
@@ -150,13 +151,48 @@ public class CommitThreadWrapper
                 LOGGER.debug("waitForCompletion returning after " + duration + " ms for transaction " + _tx);
             }
         }
+
+        @Override
+        public void waitForCompletion(final long timeout) throws TimeoutException
+        {
+            long startTime = System.currentTimeMillis();
+            long remaining = timeout;
+
+            while (!isComplete() && remaining > 0)
+            {
+                _commitThread.explicitNotify();
+                try
+                {
+                    wait(remaining);
+                }
+                catch (InterruptedException e)
+                {
+                    throw new StoreException(e);
+                }
+                if(!isComplete())
+                {
+                    remaining = (startTime + timeout) - System.currentTimeMillis();
+                }
+            }
+
+            if(remaining < 0)
+            {
+                throw new TimeoutException("Commit did not complete within required timeout: " + timeout);
+            }
+
+            if(LOGGER.isDebugEnabled())
+            {
+                long duration = System.currentTimeMillis() - startTime;
+                LOGGER.debug("waitForCompletion returning after " + duration + " ms for transaction " + _tx);
+            }
+        }
     }
 
     /**
-     * Implements a thread which batches and commits a queue of {@link BDBCommitFuture} operations. The commit operations
+     * Implements a thread which batches and commits a queue of {@link org.apache.qpid.server.store.berkeleydb.CommitThreadWrapper.BDBCommitFutureResult} operations. The commit operations
      * themselves are responsible for adding themselves to the queue and waiting for the commit to happen before
      * continuing, but it is the responsibility of this thread to tell the commit operations when they have been
-     * completed by calling back on their {@link BDBCommitFuture#complete()} and {@link BDBCommitFuture#abort} methods.
+     * completed by calling back on their {@link org.apache.qpid.server.store.berkeleydb.CommitThreadWrapper.BDBCommitFutureResult#complete()} and {@link org.apache.qpid.server.store.berkeleydb.CommitThreadWrapper.BDBCommitFutureResult#abort} methods.
      *
      * <p/><table id="crc"><caption>CRC Card</caption> <tr><th> Responsibilities <th> Collaborations </table>
      */
@@ -165,7 +201,7 @@ public class CommitThreadWrapper
         private static final Logger LOGGER = Logger.getLogger(CommitThread.class);
 
         private final AtomicBoolean _stopped = new AtomicBoolean(false);
-        private final Queue<BDBCommitFuture> _jobQueue = new ConcurrentLinkedQueue<BDBCommitFuture>();
+        private final Queue<BDBCommitFutureResult> _jobQueue = new ConcurrentLinkedQueue<BDBCommitFutureResult>();
         private final CheckpointConfig _config = new CheckpointConfig();
         private final Object _lock = new Object();
         private Environment _environment;
@@ -230,7 +266,7 @@ public class CommitThreadWrapper
 
                 for(int i = 0; i < size; i++)
                 {
-                    BDBCommitFuture commit = _jobQueue.poll();
+                    BDBCommitFutureResult commit = _jobQueue.poll();
                     commit.complete();
                 }
 
@@ -243,7 +279,7 @@ public class CommitThreadWrapper
 
                     for(int i = 0; i < size; i++)
                     {
-                        BDBCommitFuture commit = _jobQueue.poll();
+                        BDBCommitFutureResult commit = _jobQueue.poll();
                         commit.abort(e);
                     }
                 }
@@ -268,7 +304,7 @@ public class CommitThreadWrapper
             return !_jobQueue.isEmpty();
         }
 
-        public void addJob(BDBCommitFuture commit, final boolean sync)
+        public void addJob(BDBCommitFutureResult commit, final boolean sync)
         {
 
             _jobQueue.add(commit);

Modified: qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/Committer.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/Committer.java?rev=1666224&r1=1666223&r2=1666224&view=diff
==============================================================================
--- qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/Committer.java (original)
+++ qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/Committer.java Thu Mar 12 15:41:46 2015
@@ -20,15 +20,15 @@
  */
 package org.apache.qpid.server.store.berkeleydb;
 
-import org.apache.qpid.server.store.StoreFuture;
-
 import com.sleepycat.je.Transaction;
 
+import org.apache.qpid.server.util.FutureResult;
+
 public interface Committer
 {
     void start();
 
-    StoreFuture commit(Transaction tx, boolean syncCommit);
+    FutureResult commit(Transaction tx, boolean syncCommit);
 
     void stop();
-}
\ No newline at end of file
+}

Modified: qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java?rev=1666224&r1=1666223&r2=1666224&view=diff
==============================================================================
--- qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java (original)
+++ qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/EnvironmentFacade.java Thu Mar 12 15:41:46 2015
@@ -27,14 +27,13 @@ import java.util.Map;
 import com.sleepycat.je.Database;
 import com.sleepycat.je.DatabaseConfig;
 import com.sleepycat.je.DatabaseEntry;
-import com.sleepycat.je.DatabaseException;
 import com.sleepycat.je.Environment;
 import com.sleepycat.je.EnvironmentConfig;
 import com.sleepycat.je.Sequence;
 import com.sleepycat.je.SequenceConfig;
 import com.sleepycat.je.Transaction;
 
-import org.apache.qpid.server.store.StoreFuture;
+import org.apache.qpid.server.util.FutureResult;
 
 public interface EnvironmentFacade
 {
@@ -55,7 +54,7 @@ public interface EnvironmentFacade
 
     Transaction beginTransaction();
 
-    StoreFuture commit(com.sleepycat.je.Transaction tx, boolean sync);
+    FutureResult commit(com.sleepycat.je.Transaction tx, boolean sync);
 
     RuntimeException handleDatabaseException(String contextMessage, RuntimeException e);
 

Modified: qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java?rev=1666224&r1=1666223&r2=1666224&view=diff
==============================================================================
--- qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java (original)
+++ qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacade.java Thu Mar 12 15:41:46 2015
@@ -38,7 +38,7 @@ import com.sleepycat.je.Transaction;
 import org.apache.log4j.Logger;
 
 import org.apache.qpid.server.store.StoreException;
-import org.apache.qpid.server.store.StoreFuture;
+import org.apache.qpid.server.util.FutureResult;
 import org.apache.qpid.server.store.berkeleydb.logging.Log4jLoggingHandler;
 
 public class StandardEnvironmentFacade implements EnvironmentFacade
@@ -127,7 +127,7 @@ public class StandardEnvironmentFacade i
     }
 
     @Override
-    public StoreFuture commit(com.sleepycat.je.Transaction tx, boolean syncCommit)
+    public FutureResult commit(com.sleepycat.je.Transaction tx, boolean syncCommit)
     {
         try
         {

Modified: qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java?rev=1666224&r1=1666223&r2=1666224&view=diff
==============================================================================
--- qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java (original)
+++ qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacade.java Thu Mar 12 15:41:46 2015
@@ -73,7 +73,7 @@ import org.codehaus.jackson.map.ObjectMa
 
 import org.apache.qpid.server.configuration.IllegalConfigurationException;
 import org.apache.qpid.server.store.StoreException;
-import org.apache.qpid.server.store.StoreFuture;
+import org.apache.qpid.server.util.FutureResult;
 import org.apache.qpid.server.store.berkeleydb.BDBUtils;
 import org.apache.qpid.server.store.berkeleydb.CoalescingCommiter;
 import org.apache.qpid.server.store.berkeleydb.EnvHomeRegistry;
@@ -163,6 +163,8 @@ public class ReplicatedEnvironmentFacade
          * with NO_SYN durability in case if such Node crushes.
          */
         put(ReplicationConfig.LOG_FLUSH_TASK_INTERVAL, "1 min");
+
+        put(ReplicationConfig.CONSISTENCY_POLICY, "TimeConsistencyPolicy(1 s,30 s)");
     }});
 
     public static final String PERMITTED_NODE_LIST = "permittedNodes";
@@ -265,7 +267,7 @@ public class ReplicatedEnvironmentFacade
     }
 
     @Override
-    public StoreFuture commit(final Transaction tx, boolean syncCommit)
+    public FutureResult commit(final Transaction tx, boolean syncCommit)
     {
         try
         {
@@ -283,7 +285,7 @@ public class ReplicatedEnvironmentFacade
         {
             return _coalescingCommiter.commit(tx, syncCommit);
         }
-        return StoreFuture.IMMEDIATE_FUTURE;
+        return FutureResult.IMMEDIATE_FUTURE;
     }
 
     @Override

Modified: qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHARemoteReplicationNodeImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHARemoteReplicationNodeImpl.java?rev=1666224&r1=1666223&r2=1666224&view=diff
==============================================================================
--- qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHARemoteReplicationNodeImpl.java (original)
+++ qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHARemoteReplicationNodeImpl.java Thu Mar 12 15:41:46 2015
@@ -24,9 +24,11 @@ package org.apache.qpid.server.virtualho
 import java.util.Map;
 import java.util.Set;
 
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
 import com.sleepycat.je.rep.MasterStateException;
-
 import org.apache.log4j.Logger;
+
 import org.apache.qpid.server.configuration.IllegalConfigurationException;
 import org.apache.qpid.server.logging.EventLogger;
 import org.apache.qpid.server.logging.messages.HighAvailabilityMessages;
@@ -126,7 +128,7 @@ public class BDBHARemoteReplicationNodeI
     }
 
     @StateTransition(currentState = {State.ACTIVE, State.UNAVAILABLE}, desiredState = State.DELETED)
-    private void doDelete()
+    private ListenableFuture<Void> doDelete()
     {
         String nodeName = getName();
 
@@ -146,6 +148,8 @@ public class BDBHARemoteReplicationNodeI
         {
             throw new IllegalStateTransitionException("Unexpected exception on node '" + nodeName + "' deletion", e);
         }
+
+        return Futures.immediateFuture(null);
     }
 
     protected void afterSetRole()

Modified: qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java?rev=1666224&r1=1666223&r2=1666224&view=diff
==============================================================================
--- qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java (original)
+++ qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java Thu Mar 12 15:41:46 2015
@@ -42,6 +42,10 @@ import java.util.concurrent.atomic.Atomi
 
 import javax.security.auth.Subject;
 
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
 import com.sleepycat.je.DatabaseException;
 import com.sleepycat.je.LogWriteException;
 import com.sleepycat.je.rep.NodeState;
@@ -318,7 +322,7 @@ public class BDBHAVirtualHostNodeImpl ex
     }
 
     @Override
-    protected void activate()
+    protected ListenableFuture<Void> activate()
     {
         if (LOGGER.isDebugEnabled())
         {
@@ -352,6 +356,7 @@ public class BDBHAVirtualHostNodeImpl ex
                 {
                     getEventLogger().message(getGroupLogSubject(), HighAvailabilityMessages.INTRUDER_DETECTED(node.getName(), nodeAddress));
                     shutdownOnIntruder(nodeAddress);
+
                     throw new IllegalStateException("Intruder node detected: " + nodeAddress);
                 }
             }
@@ -367,24 +372,49 @@ public class BDBHAVirtualHostNodeImpl ex
             environmentFacade.setReplicationGroupListener(new RemoteNodesDiscoverer());
             environmentFacade.setPermittedNodes(_permittedNodes);
         }
+
+        return Futures.immediateFuture(null);
     }
 
     @StateTransition( currentState = { State.UNINITIALIZED, State.ACTIVE, State.ERRORED }, desiredState = State.STOPPED )
-    protected void doStop()
+    protected ListenableFuture<Void> doStop()
     {
-        try
-        {
-            super.doStop();
-        }
-        finally
+        final SettableFuture<Void> returnVal = SettableFuture.create();
+
+        ListenableFuture<Void> superFuture = super.doStop();
+        Futures.addCallback(superFuture, new FutureCallback<Void>()
         {
-            closeEnvironment();
+            @Override
+            public void onSuccess(final Void result)
+            {
+                doFinally();
+            }
 
-            // closing the environment does not cause a state change.  Adjust the role
-            // so that our observers will see DETACHED rather than our previous role in the group.
-            _lastRole.set(NodeRole.DETACHED);
-            attributeSet(ROLE, _role, NodeRole.DETACHED);
-        }
+            @Override
+            public void onFailure(final Throwable t)
+            {
+                doFinally();
+            }
+
+            private void doFinally()
+            {
+                try
+                {
+                    closeEnvironment();
+
+                    // closing the environment does not cause a state change.  Adjust the role
+                    // so that our observers will see DETACHED rather than our previous role in the group.
+                    _lastRole.set(NodeRole.DETACHED);
+                    attributeSet(ROLE, _role, NodeRole.DETACHED);
+                }
+                finally
+                {
+                    returnVal.set(null);
+                }
+
+            }
+        });
+        return returnVal;
     }
 
     private void closeEnvironment()
@@ -397,43 +427,52 @@ public class BDBHAVirtualHostNodeImpl ex
     }
 
     @StateTransition( currentState = { State.ACTIVE, State.STOPPED, State.ERRORED}, desiredState = State.DELETED )
-    protected void doDelete()
+    protected ListenableFuture<Void> doDelete()
     {
-        // get helpers before close. on close all children are closed and not available anymore
-        Set<InetSocketAddress> helpers = getRemoteNodeAddresses();
-        super.doDelete();
-
-        if (getConfigurationStore() != null)
-        {
-            getEventLogger().message(getVirtualHostNodeLogSubject(), HighAvailabilityMessages.DELETED());
-        }
 
-        if (getState() == State.DELETED && !helpers.isEmpty())
+        // get helpers before close. on close all children are closed and not available anymore
+        final Set<InetSocketAddress> helpers = getRemoteNodeAddresses();
+        return doAfter(super.doDelete(),new Runnable()
         {
-            try
-            {
-                new ReplicationGroupAdmin(_groupName, helpers).removeMember(getName());
-            }
-            catch(DatabaseException e)
+            @Override
+            public void run()
             {
-                LOGGER.warn("The deletion of node " + this + " on remote nodes failed due to: " + e.getMessage()
-                        + ". To finish deletion a removal of the node from any of remote nodes (" + helpers + ") is required.");
+                    if (getConfigurationStore() != null)
+                    {
+                        getEventLogger().message(getVirtualHostNodeLogSubject(), HighAvailabilityMessages.DELETED());
+                    }
+
+                    if (getState() == State.DELETED && !helpers.isEmpty())
+                    {
+                        try
+                        {
+                            new ReplicationGroupAdmin(_groupName, helpers).removeMember(getName());
+                        }
+                        catch(DatabaseException e)
+                        {
+                            LOGGER.warn("The deletion of node " + this + " on remote nodes failed due to: " + e.getMessage()
+                                        + ". To finish deletion a removal of the node from any of remote nodes (" + helpers + ") is required.");
+                        }
+                    }
+
             }
-        }
+        });
+
+
     }
 
     @Override
-    protected void deleteVirtualHostIfExists()
+    protected ListenableFuture<Void> deleteVirtualHostIfExists()
     {
         ReplicatedEnvironmentFacade replicatedEnvironmentFacade = getReplicatedEnvironmentFacade();
         if (replicatedEnvironmentFacade != null && replicatedEnvironmentFacade.isMaster()
                 && replicatedEnvironmentFacade.getNumberOfElectableGroupMembers() == 1)
         {
-            super.deleteVirtualHostIfExists();
+            return super.deleteVirtualHostIfExists();
         }
         else
         {
-            closeVirtualHostIfExist();
+            return closeVirtualHostIfExist();
         }
     }
 
@@ -553,7 +592,7 @@ public class BDBHAVirtualHostNodeImpl ex
     {
         try
         {
-            closeVirtualHostIfExist();
+            closeVirtualHostIfExist().get();
 
             getConfigurationStore().upgradeStoreStructure();
 
@@ -640,7 +679,7 @@ public class BDBHAVirtualHostNodeImpl ex
     {
         try
         {
-            closeVirtualHostIfExist();
+            closeVirtualHostIfExist().get();
 
             Map<String, Object> hostAttributes = new HashMap<>();
             hostAttributes.put(VirtualHost.MODEL_VERSION, BrokerModel.MODEL_VERSION);
@@ -654,13 +693,24 @@ public class BDBHAVirtualHostNodeImpl ex
         }
     }
 
-    protected void closeVirtualHostIfExist()
+    protected ListenableFuture<Void> closeVirtualHostIfExist()
     {
-        VirtualHost<?,?,?> virtualHost = getVirtualHost();
+        final VirtualHost<?,?,?> virtualHost = getVirtualHost();
         if (virtualHost!= null)
         {
-            virtualHost.close();
-            childRemoved(virtualHost);
+            return doAfter(virtualHost.closeAsync(), new Runnable()
+            {
+                @Override
+                public void run()
+                {
+                        childRemoved(virtualHost);
+
+                }
+            });
+        }
+        else
+        {
+            return Futures.immediateFuture(null);
         }
     }
 
@@ -687,15 +737,19 @@ public class BDBHAVirtualHostNodeImpl ex
                         onReplica();
                         break;
                     case DETACHED:
-                        closeVirtualHostIfExist();
+                        closeVirtualHostIfExist().get();
                         break;
                     case UNKNOWN:
-                        closeVirtualHostIfExist();
+                        closeVirtualHostIfExist().get();
                         break;
                     default:
                         LOGGER.error("Unexpected state change: " + state);
                 }
             }
+            catch (InterruptedException | ExecutionException e)
+            {
+                throw new ServerScopedRuntimeException(e);
+            }
             finally
             {
                 NodeRole newRole = NodeRole.fromJeState(state);
@@ -1137,7 +1191,7 @@ public class BDBHAVirtualHostNodeImpl ex
 
                 try
                 {
-                    close();
+                    closeAsync();
                 }
                 finally
                 {

Modified: qpid/trunk/qpid/java/broker-core/pom.xml
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/pom.xml?rev=1666224&r1=1666223&r2=1666224&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/pom.xml (original)
+++ qpid/trunk/qpid/java/broker-core/pom.xml Thu Mar 12 15:41:46 2015
@@ -107,8 +107,14 @@
       </exclusions>
     </dependency>
 
+    <dependency>
+      <groupId>com.google.guava</groupId>
+      <artifactId>guava</artifactId>
+      <version>${guava-version}</version>
+    </dependency>
+
      <!-- test dependencies -->
-     <dependency>
+    <dependency>
       <groupId>org.apache.qpid</groupId>
       <artifactId>qpid-test-utils</artifactId>
       <version>${project.version}</version>

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/Broker.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/Broker.java?rev=1666224&r1=1666223&r2=1666224&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/Broker.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/Broker.java Thu Mar 12 15:41:46 2015
@@ -29,9 +29,13 @@ import java.security.PrivilegedException
 import java.util.HashSet;
 import java.util.Properties;
 import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 import javax.security.auth.Subject;
 
+import com.google.common.util.concurrent.ListenableFuture;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 import org.apache.log4j.PropertyConfigurator;
@@ -106,11 +110,16 @@ public class Broker implements BrokerShu
             {
                 if(_systemConfig != null)
                 {
-                    _systemConfig.close();
+                    ListenableFuture<Void> closeResult = _systemConfig.closeAsync();
+                    closeResult.get(30000l, TimeUnit.MILLISECONDS);
                 }
                 _taskExecutor.stop();
 
             }
+            catch (TimeoutException | InterruptedException | ExecutionException e)
+            {
+                LOGGER.warn("Attempting to cleanly shutdown took too long, exiting immediately");
+            }
             finally
             {
                 if (_configuringOwnLogging)

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/binding/BindingImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/binding/BindingImpl.java?rev=1666224&r1=1666223&r2=1666224&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/binding/BindingImpl.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/binding/BindingImpl.java Thu Mar 12 15:41:46 2015
@@ -28,6 +28,9 @@ import java.util.concurrent.CopyOnWriteA
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+
 import org.apache.qpid.server.configuration.IllegalConfigurationException;
 import org.apache.qpid.server.configuration.updater.VoidTask;
 import org.apache.qpid.server.exchange.AbstractExchange;
@@ -196,7 +199,7 @@ public class BindingImpl
     }
 
     @StateTransition(currentState = State.ACTIVE, desiredState = State.DELETED)
-    private void doDelete()
+    private ListenableFuture<Void> doDelete()
     {
         if(_deleted.compareAndSet(false,true))
         {
@@ -209,12 +212,14 @@ public class BindingImpl
 
         deleted();
         setState(State.DELETED);
+        return Futures.immediateFuture(null);
     }
 
     @StateTransition(currentState = State.UNINITIALIZED, desiredState = State.ACTIVE)
-    private void activate()
+    private ListenableFuture<Void> activate()
     {
         setState(State.ACTIVE);
+        return Futures.immediateFuture(null);
     }
 
     public void addStateChangeListener(StateChangeListener<BindingImpl,State> listener)

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutor.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutor.java?rev=1666224&r1=1666223&r2=1666224&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutor.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutor.java Thu Mar 12 15:41:46 2015
@@ -21,6 +21,7 @@
 package org.apache.qpid.server.configuration.updater;
 
 import java.util.concurrent.CancellationException;
+import java.util.concurrent.Executor;
 import java.util.concurrent.Future;
 
 public interface TaskExecutor
@@ -43,4 +44,7 @@ public interface TaskExecutor
 
     <T> Future<T> submit(Task<T> task) throws CancellationException;
 
+    boolean isTaskExecutorThread();
+
+    Executor getExecutor();
 }

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutorImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutorImpl.java?rev=1666224&r1=1666223&r2=1666224&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutorImpl.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutorImpl.java Thu Mar 12 15:41:46 2015
@@ -26,6 +26,7 @@ import java.util.List;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
@@ -48,6 +49,7 @@ public class TaskExecutorImpl implements
     private volatile Thread _taskThread;
     private final AtomicBoolean _running = new AtomicBoolean();
     private volatile ExecutorService _executor;
+    private final ImmediateIfSameThreadExecutor _wrappedExecutor = new ImmediateIfSameThreadExecutor();
 
 
     @Override
@@ -67,7 +69,7 @@ public class TaskExecutorImpl implements
                 @Override
                 public Thread newThread(Runnable r)
                 {
-                    _taskThread = new Thread(r, TASK_EXECUTION_THREAD_NAME);
+                    _taskThread = new TaskThread(r, TASK_EXECUTION_THREAD_NAME, TaskExecutorImpl.this);
                     return _taskThread;
                 }
             });
@@ -277,7 +279,13 @@ public class TaskExecutorImpl implements
         }
     }
 
-    private boolean isTaskExecutorThread()
+    @Override
+    public Executor getExecutor()
+    {
+        return _wrappedExecutor;
+    }
+
+    public boolean isTaskExecutorThread()
     {
         return Thread.currentThread() == _taskThread;
     }
@@ -373,4 +381,41 @@ public class TaskExecutorImpl implements
             return get();
         }
     }
+
+    private class ImmediateIfSameThreadExecutor implements Executor
+    {
+
+        @Override
+        public void execute(final Runnable command)
+        {
+            if(isTaskExecutorThread()
+               || (_executor == null && (Thread.currentThread() instanceof TaskThread
+                   && ((TaskThread)Thread.currentThread()).getTaskExecutor() == TaskExecutorImpl.this)))
+            {
+                command.run();
+            }
+            else
+            {
+                _executor.execute(command);
+            }
+
+        }
+    }
+
+    private static class TaskThread extends Thread
+    {
+
+        private final TaskExecutorImpl _taskExecutor;
+
+        public TaskThread(final Runnable r, final String name, final TaskExecutorImpl taskExecutor)
+        {
+            super(r, name);
+            _taskExecutor = taskExecutor;
+        }
+
+        public TaskExecutorImpl getTaskExecutor()
+        {
+            return _taskExecutor;
+        }
+    }
 }

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/connection/ConnectionPrincipal.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/connection/ConnectionPrincipal.java?rev=1666224&r1=1666223&r2=1666224&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/connection/ConnectionPrincipal.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/connection/ConnectionPrincipal.java Thu Mar 12 15:41:46 2015
@@ -20,11 +20,12 @@
  */
 package org.apache.qpid.server.connection;
 
+import java.net.SocketAddress;
+
+import org.apache.qpid.server.model.VirtualHost;
 import org.apache.qpid.server.protocol.AMQConnectionModel;
 import org.apache.qpid.server.security.auth.SocketConnectionPrincipal;
 
-import java.net.SocketAddress;
-
 public class ConnectionPrincipal implements SocketConnectionPrincipal
 {
     private final AMQConnectionModel _connection;
@@ -51,6 +52,11 @@ public class ConnectionPrincipal impleme
         return _connection;
     }
 
+    public VirtualHost<?,?,?> getVirtualHost()
+    {
+        return _connection.getVirtualHost();
+    }
+
     @Override
     public boolean equals(final Object o)
     {

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java?rev=1666224&r1=1666223&r2=1666224&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java Thu Mar 12 15:41:46 2015
@@ -74,7 +74,7 @@ public class ConnectionRegistry implemen
                 AMQConnectionModel connection = itr.next();
                 try
                 {
-                    connection.close(AMQConstant.CONNECTION_FORCED, replyText);
+                    connection.closeAsync(AMQConstant.CONNECTION_FORCED, replyText);
                 }
                 catch (Exception e)
                 {

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java?rev=1666224&r1=1666223&r2=1666224&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/AbstractConsumerTarget.java Thu Mar 12 15:41:46 2015
@@ -23,17 +23,21 @@ package org.apache.qpid.server.consumer;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
+import org.apache.log4j.Logger;
+
+import org.apache.qpid.server.message.MessageInstance;
 import org.apache.qpid.server.util.StateChangeListener;
 
 public abstract class AbstractConsumerTarget implements ConsumerTarget
 {
-
+    private static final Logger LOGGER = Logger.getLogger(AbstractConsumerTarget.class);
     private final AtomicReference<State> _state;
 
     private final Set<StateChangeListener<ConsumerTarget, State>> _stateChangeListeners = new
@@ -41,6 +45,7 @@ public abstract class AbstractConsumerTa
 
     private final Lock _stateChangeLock = new ReentrantLock();
     private final AtomicInteger _stateActivates = new AtomicInteger();
+    private ConcurrentLinkedQueue<ConsumerMessageInstancePair> _queue = new ConcurrentLinkedQueue();
 
 
     protected AbstractConsumerTarget(final State initialState)
@@ -48,6 +53,26 @@ public abstract class AbstractConsumerTa
         _state = new AtomicReference<State>(initialState);
     }
 
+    @Override
+    public void processPending()
+    {
+        while(hasMessagesToSend())
+        {
+            sendNextMessage();
+        }
+
+        processClosed();
+    }
+
+    protected abstract void processClosed();
+
+    @Override
+    public final boolean isSuspended()
+    {
+        return getSessionModel().getConnectionModel().isMessageAssignmentSuspended() || doIsSuspended();
+    }
+
+    protected abstract boolean doIsSuspended();
 
     public final State getState()
     {
@@ -101,6 +126,7 @@ public abstract class AbstractConsumerTa
         }
     }
 
+    @Override
     public final void notifyCurrentState()
     {
 
@@ -136,4 +162,41 @@ public abstract class AbstractConsumerTa
         _stateChangeLock.unlock();
     }
 
+    @Override
+    public final long send(final ConsumerImpl consumer, MessageInstance entry, boolean batch)
+    {
+        _queue.add(new ConsumerMessageInstancePair(consumer, entry, batch));
+
+        getSessionModel().getConnectionModel().notifyWork();
+        return entry.getMessage().getSize();
+    }
+
+    protected abstract void doSend(final ConsumerImpl consumer, MessageInstance entry, boolean batch);
+
+    @Override
+    public boolean hasMessagesToSend()
+    {
+        return !_queue.isEmpty();
+    }
+
+    @Override
+    public void sendNextMessage()
+    {
+        ConsumerMessageInstancePair consumerMessage = _queue.peek();
+        if (consumerMessage != null)
+        {
+            _queue.poll();
+
+            ConsumerImpl consumer = consumerMessage.getConsumer();
+            MessageInstance entry = consumerMessage.getEntry();
+            boolean batch = consumerMessage.isBatch();
+            doSend(consumer, entry, batch);
+
+            if (consumer.acquires())
+            {
+                entry.unlockAcquisition();
+            }
+        }
+
+    }
 }

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerImpl.java?rev=1666224&r1=1666223&r2=1666224&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerImpl.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerImpl.java Thu Mar 12 15:41:46 2015
@@ -22,6 +22,8 @@ package org.apache.qpid.server.consumer;
 
 import java.util.concurrent.atomic.AtomicLong;
 
+import com.google.common.util.concurrent.ListenableFuture;
+
 import org.apache.qpid.server.message.MessageSource;
 import org.apache.qpid.server.protocol.AMQSessionModel;
 
@@ -31,6 +33,8 @@ public interface ConsumerImpl
 
     void externalStateChange();
 
+    ConsumerTarget getTarget();
+
     enum Option
     {
         ACQUIRES,

Modified: qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java?rev=1666224&r1=1666223&r2=1666224&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java (original)
+++ qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java Thu Mar 12 15:41:46 2015
@@ -33,6 +33,8 @@ public interface ConsumerTarget
 
     void removeStateChangeListener(StateChangeListener<ConsumerTarget, State> listener);
 
+    void processPending();
+
     enum State
     {
         ACTIVE, SUSPENDED, CLOSED
@@ -44,6 +46,8 @@ public interface ConsumerTarget
 
     void consumerRemoved(ConsumerImpl sub);
 
+    void notifyCurrentState();
+
     void addStateListener(StateChangeListener<ConsumerTarget, State> listener);
 
     long getUnacknowledgedBytes();
@@ -54,6 +58,10 @@ public interface ConsumerTarget
 
     long send(final ConsumerImpl consumer, MessageInstance entry, boolean batch);
 
+    boolean hasMessagesToSend();
+
+    void sendNextMessage();
+
     void flushBatched();
 
     void queueDeleted();



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org