You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2014/01/14 15:46:38 UTC
svn commit: r1558056 [1/4] - in /qpid/branches/java-broker-bdb-ha/qpid/java:
./ amqp-1-0-client-jms/
amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/
amqp-1-0-client-websocket/ amqp-1-0-client-websocket/resources/
amqp-1-0-client/ a...
Author: kwall
Date: Tue Jan 14 14:46:35 2014
New Revision: 1558056
URL: http://svn.apache.org/r1558056
Log:
NO-JIRA: Merge latest changes back from trunk.
Changes merged with command:
svn merge -r 1549894:1558036 https://svn.apache.org/repos/asf/qpid/trunk
Added:
qpid/branches/java-broker-bdb-ha/qpid/java/amqp-1-0-client-websocket/ (props changed)
- copied from r1558036, qpid/trunk/qpid/java/amqp-1-0-client-websocket/
qpid/branches/java-broker-bdb-ha/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/SSLUtil.java
- copied unchanged from r1558036, qpid/trunk/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/SSLUtil.java
qpid/branches/java-broker-bdb-ha/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/TCPTransportProviderFactory.java
- copied unchanged from r1558036, qpid/trunk/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/TCPTransportProviderFactory.java
qpid/branches/java-broker-bdb-ha/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/TCPTransportProvier.java
- copied unchanged from r1558036, qpid/trunk/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/TCPTransportProvier.java
qpid/branches/java-broker-bdb-ha/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/TransportProvider.java
- copied unchanged from r1558036, qpid/trunk/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/TransportProvider.java
qpid/branches/java-broker-bdb-ha/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/TransportProviderFactory.java
- copied unchanged from r1558036, qpid/trunk/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/TransportProviderFactory.java
qpid/branches/java-broker-bdb-ha/qpid/java/amqp-1-0-client/src/main/resources/
- copied from r1558036, qpid/trunk/qpid/java/amqp-1-0-client/src/main/resources/
qpid/branches/java-broker-bdb-ha/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/ExceptionHandler.java
- copied unchanged from r1558036, qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/ExceptionHandler.java
qpid/branches/java-broker-bdb-ha/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/Predicate.java
- copied unchanged from r1558036, qpid/trunk/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/Predicate.java
qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/TransportProviderFactory.java
- copied unchanged from r1558036, qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/TransportProviderFactory.java
qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/
- copied from r1558036, qpid/trunk/qpid/java/broker-core/src/main/java/org/apache/qpid/server/transport/
qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.TransportProviderFactory
- copied unchanged from r1558036, qpid/trunk/qpid/java/broker-core/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.TransportProviderFactory
qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/websocket/
- copied from r1558036, qpid/trunk/qpid/java/broker-plugins/websocket/
qpid/branches/java-broker-bdb-ha/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/BrokerRestHttpsClientCertAuthTest.java
- copied unchanged from r1558036, qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/BrokerRestHttpsClientCertAuthTest.java
Removed:
qpid/branches/java-broker-bdb-ha/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/SocketExceptionHandler.java
qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/federation/
qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/management/
qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/access/plugins/
qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/rmi/
qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/signal/
qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/firewall/
Modified:
qpid/branches/java-broker-bdb-ha/qpid/java/ (props changed)
qpid/branches/java-broker-bdb-ha/qpid/java/amqp-1-0-client/ (props changed)
qpid/branches/java-broker-bdb-ha/qpid/java/amqp-1-0-client-jms/ (props changed)
qpid/branches/java-broker-bdb-ha/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionFactoryImpl.java
qpid/branches/java-broker-bdb-ha/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java
qpid/branches/java-broker-bdb-ha/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java
qpid/branches/java-broker-bdb-ha/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageProducerImpl.java
qpid/branches/java-broker-bdb-ha/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java
qpid/branches/java-broker-bdb-ha/qpid/java/amqp-1-0-client-websocket/resources/ (props changed)
qpid/branches/java-broker-bdb-ha/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Respond.java
qpid/branches/java-broker-bdb-ha/qpid/java/amqp-1-0-client/pom.xml
qpid/branches/java-broker-bdb-ha/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Connection.java
qpid/branches/java-broker-bdb-ha/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/ConnectionErrorException.java
qpid/branches/java-broker-bdb-ha/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receiver.java
qpid/branches/java-broker-bdb-ha/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Sender.java
qpid/branches/java-broker-bdb-ha/qpid/java/amqp-1-0-common/ (props changed)
qpid/branches/java-broker-bdb-ha/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/codec/SymbolTypeConstructor.java
qpid/branches/java-broker-bdb-ha/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/ConnectionHandler.java
qpid/branches/java-broker-bdb-ha/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/framing/FrameHandler.java
qpid/branches/java-broker-bdb-ha/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java
qpid/branches/java-broker-bdb-ha/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/LinkEndpoint.java
qpid/branches/java-broker-bdb-ha/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/transport/SessionEndpoint.java
qpid/branches/java-broker-bdb-ha/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/type/transport/Error.java
qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/resources/virtualhost/bdb_ha/add.html
qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/src/main/java/resources/virtualhost/store/bdb/add.html
qpid/branches/java-broker-bdb-ha/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/HAClusterManagementTest.java
qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/ (props changed)
qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/Broker.java
qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchange.java
qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java
qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java
qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DirectExchangeType.java
qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/ExchangeFactory.java
qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java
qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java
qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/FanoutExchangeType.java
qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/FilterSupport.java
qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java
qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersExchangeType.java
qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java
qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/TopicExchangeType.java
qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterManagerFactory.java
qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Connection.java
qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Transport.java
qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/AmqpPortAdapter.java
qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java
qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/PortAdapter.java
qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/PortFactory.java
qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/plugin/ExchangeType.java
qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java
qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/ (props changed)
qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/BaseQueue.java
qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/InboundMessageAdapter.java
qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/SubjectCreator.java
qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java (contents, props changed)
qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/ConfigurationRecoveryHandler.java
qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/DurableConfigurationStoreHelper.java
qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/subscription/AssignedSubscriptionMessageGroupManager.java
qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/subscription/DefinedGroupMessageGroupManager.java
qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/subscription/Subscription.java
qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/ (props changed)
qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/BindingRecoverer.java
qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/VirtualHostConfigurationTest.java
qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/DefaultExchangeFactoryTest.java
qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/FanoutExchangeTest.java
qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java
qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java
qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/test/java/org/apache/qpid/server/logging/subjects/BindingLogSubjectTest.java
qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/InboundMessageAdapterTest.java
qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaData.java
qpid/branches/java-broker-bdb-ha/qpid/java/broker-core/src/test/java/org/apache/qpid/server/subscription/MockSubscription.java
qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/access-control/src/test/java/org/apache/qpid/server/security/access/plugins/RuleSetTest.java
qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaData_0_10.java
qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageTransferMessage.java
qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java
qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeBoundHandler.java
qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngineTest.java
qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java
qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ (props changed)
qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java
qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/amqp-msg-conv-0-8-to-0-10/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java
qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/derby-store/src/main/java/resources/virtualhost/store/derby/add.html
qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/jdbc-provider-bone/src/main/java/resources/virtualhost/store/pool/bonecp/add.html
qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/jdbc-store/src/main/java/resources/virtualhost/store/jdbc/add.html
qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/jdbc-store/src/main/java/resources/virtualhost/store/pool/ (props changed)
qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/jdbc-store/src/main/java/resources/virtualhost/store/pool/none/add.html
qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagement.java
qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagementUtil.java
qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/AbstractServlet.java
qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/SaslServlet.java
qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/management-http/src/main/java/resources/addPort.html
qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/management-http/src/main/java/resources/footer.html
qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/Connection.js
qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/management-http/src/main/java/resources/js/qpid/management/addPort.js
qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/management-http/src/main/java/resources/showConnection.html
qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/management-http/src/main/java/resources/showMessage.html
qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/management-http/src/main/java/resources/virtualhost/standard/add.html
qpid/branches/java-broker-bdb-ha/qpid/java/broker-plugins/memory-store/src/main/java/resources/virtualhost/store/memory/add.html
qpid/branches/java-broker-bdb-ha/qpid/java/build.deps
qpid/branches/java-broker-bdb-ha/qpid/java/build.xml
qpid/branches/java-broker-bdb-ha/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
qpid/branches/java-broker-bdb-ha/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_8.java
qpid/branches/java-broker-bdb-ha/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractAMQMessageDelegate.java
qpid/branches/java-broker-bdb-ha/qpid/java/client/src/test/java/org/apache/qpid/client/transport/TestNetworkConnection.java
qpid/branches/java-broker-bdb-ha/qpid/java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java
qpid/branches/java-broker-bdb-ha/qpid/java/common/src/main/java/org/apache/qpid/transport/network/NetworkConnection.java
qpid/branches/java-broker-bdb-ha/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkConnection.java
qpid/branches/java-broker-bdb-ha/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkTransport.java
qpid/branches/java-broker-bdb-ha/qpid/java/common/src/test/java/org/apache/qpid/transport/network/io/IdleTimeoutTickerTest.java
qpid/branches/java-broker-bdb-ha/qpid/java/ivy.nexus.xml
qpid/branches/java-broker-bdb-ha/qpid/java/jca/README-GERONIMO.txt
qpid/branches/java-broker-bdb-ha/qpid/java/jca/README-JBOSS-EAP6.txt
qpid/branches/java-broker-bdb-ha/qpid/java/jca/README-JBOSS.txt
qpid/branches/java-broker-bdb-ha/qpid/java/jca/README.txt
qpid/branches/java-broker-bdb-ha/qpid/java/jca/example/README-EXAMPLE.txt
qpid/branches/java-broker-bdb-ha/qpid/java/jca/example/README-GERONIMO.txt
qpid/branches/java-broker-bdb-ha/qpid/java/jca/example/README-GLASSFISH.txt
qpid/branches/java-broker-bdb-ha/qpid/java/jca/example/README-JBOSS.txt
qpid/branches/java-broker-bdb-ha/qpid/java/jca/example/README-JBOSS7.txt
qpid/branches/java-broker-bdb-ha/qpid/java/jca/example/conf/glassfish-ejb-jar.xml
qpid/branches/java-broker-bdb-ha/qpid/java/jca/example/conf/glassfish-resources.xml
qpid/branches/java-broker-bdb-ha/qpid/java/jca/example/conf/glassfish-web.xml
qpid/branches/java-broker-bdb-ha/qpid/java/module.xml
qpid/branches/java-broker-bdb-ha/qpid/java/pom.xml
qpid/branches/java-broker-bdb-ha/qpid/java/systests/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngineFactoryTest.java
qpid/branches/java-broker-bdb-ha/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MessageGroupQueueTest.java
qpid/branches/java-broker-bdb-ha/qpid/java/systests/src/main/java/org/apache/qpid/systest/management/jmx/ExchangeManagementTest.java
qpid/branches/java-broker-bdb-ha/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/Asserts.java
qpid/branches/java-broker-bdb-ha/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/PreferencesRestTest.java
qpid/branches/java-broker-bdb-ha/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/QpidRestTestCase.java
qpid/branches/java-broker-bdb-ha/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/RestTestHelper.java
qpid/branches/java-broker-bdb-ha/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/SaslRestTest.java
qpid/branches/java-broker-bdb-ha/qpid/java/systests/src/main/java/org/apache/qpid/test/client/message/JMSDestinationTest.java
qpid/branches/java-broker-bdb-ha/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/DynamicQueueExchangeCreateTest.java
qpid/branches/java-broker-bdb-ha/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/client/protocol/AMQProtocolSessionTest.java
Propchange: qpid/branches/java-broker-bdb-ha/qpid/java/
------------------------------------------------------------------------------
Merged /qpid/trunk/qpid/java:r1549895-1558036
Propchange: qpid/branches/java-broker-bdb-ha/qpid/java/amqp-1-0-client/
------------------------------------------------------------------------------
Merged /qpid/trunk/qpid/java/amqp-1-0-client:r1549895-1558036
Propchange: qpid/branches/java-broker-bdb-ha/qpid/java/amqp-1-0-client-jms/
------------------------------------------------------------------------------
Merged /qpid/trunk/qpid/java/amqp-1-0-client-jms:r1549895-1558036
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionFactoryImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionFactoryImpl.java?rev=1558056&r1=1558055&r2=1558056&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionFactoryImpl.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionFactoryImpl.java Tue Jan 14 14:46:35 2014
@@ -26,16 +26,27 @@ import java.net.URL;
import java.net.URLConnection;
import java.net.URLDecoder;
import java.net.URLStreamHandler;
+import java.security.GeneralSecurityException;
+import java.security.KeyStore;
+import java.security.cert.X509Certificate;
+import java.util.HashMap;
+import java.util.Map;
import javax.jms.JMSException;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
+import javax.net.ssl.KeyManagerFactory;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.TrustManagerFactory;
+
+import org.apache.qpid.amqp_1_0.client.SSLUtil;
import org.apache.qpid.amqp_1_0.jms.ConnectionFactory;
public class ConnectionFactoryImpl implements ConnectionFactory, TopicConnectionFactory, QueueConnectionFactory
{
+ private final String _protocol;
private String _host;
private int _port;
private String _username;
@@ -49,6 +60,13 @@ public class ConnectionFactoryImpl imple
private boolean _useBinaryMessageId = Boolean.parseBoolean(System.getProperty("qpid.use_binary_message_id", "true"));
private boolean _syncPublish = Boolean.parseBoolean(System.getProperty("qpid.sync_publish", "false"));
private int _maxSessions = Integer.getInteger("qpid.max_sessions", 0);
+ private int _maxPrefetch;
+ private String _keyStorePath;
+ private String _keyStorePassword;
+ private String _keyStoreCertAlias;
+ private String _trustStorePath;
+ private String _trustStorePassword;
+ private SSLContext _sslContext;
public ConnectionFactoryImpl(final String host,
@@ -98,6 +116,20 @@ public class ConnectionFactoryImpl imple
final boolean ssl,
final int maxSessions)
{
+ this(ssl?"amqps":"amqp",host,port,username,password,clientId,remoteHost,ssl,maxSessions);
+ }
+
+ public ConnectionFactoryImpl(final String protocol,
+ final String host,
+ final int port,
+ final String username,
+ final String password,
+ final String clientId,
+ final String remoteHost,
+ final boolean ssl,
+ final int maxSessions)
+ {
+ _protocol = protocol;
_host = host;
_port = port;
_username = username;
@@ -113,118 +145,342 @@ public class ConnectionFactoryImpl imple
return createConnection(_username, _password);
}
- public ConnectionImpl createConnection(final String username, final String password) throws JMSException
+ public ConnectionImpl createConnection(String username, final String password) throws JMSException
{
- ConnectionImpl connection = new ConnectionImpl(_host, _port, username, password, _clientId, _remoteHost, _ssl, _maxSessions);
+ synchronized (this)
+ {
+ if(_ssl && _sslContext == null)
+ {
+ try
+ {
+ _sslContext = SSLUtil.buildSslContext(_keyStoreCertAlias,_keyStorePath,
+ KeyStore.getDefaultType(),
+ _keyStorePassword,
+ KeyManagerFactory.getDefaultAlgorithm(),
+ _trustStorePath,_trustStorePassword,
+ KeyStore.getDefaultType(),
+ TrustManagerFactory.getDefaultAlgorithm());
+ if(username == null && _keyStoreCertAlias != null)
+ {
+ X509Certificate[] certs = SSLUtil.getClientCertificates(_keyStoreCertAlias,
+ _keyStorePath,
+ _keyStorePassword,
+ KeyStore.getDefaultType(),
+ KeyManagerFactory.getDefaultAlgorithm());
+ if(certs != null && certs.length != 0)
+ {
+ username = certs[0].getSubjectDN().getName();
+ }
+ }
+
+ }
+ catch (GeneralSecurityException e)
+ {
+ final JMSException jmsException = new JMSException("Unable to create SSL context");
+ jmsException.setLinkedException(e);
+ jmsException.initCause(e);
+ throw jmsException;
+ }
+ catch (IOException e)
+ {
+ final JMSException jmsException = new JMSException("Unable to create SSL context");
+ jmsException.setLinkedException(e);
+ jmsException.initCause(e);
+ throw jmsException; }
+ }
+ }
+ ConnectionImpl connection = new ConnectionImpl(_protocol,_host, _port, username, password, _clientId, _remoteHost, _sslContext, _maxSessions);
connection.setQueuePrefix(_queuePrefix);
connection.setTopicPrefix(_topicPrefix);
connection.setUseBinaryMessageId(_useBinaryMessageId);
connection.setSyncPublish(_syncPublish);
+ if(_maxPrefetch != 0)
+ {
+ connection.setMaxPrefetch(_maxPrefetch);
+ }
return connection;
}
+ public void setMaxPrefetch(final int maxPrefetch)
+ {
+ _maxPrefetch = maxPrefetch;
+ }
+
+ public void setKeyStorePath(final String keyStorePath)
+ {
+ _keyStorePath = keyStorePath;
+ }
+
+ public void setKeyStorePassword(final String keyStorePassword)
+ {
+ _keyStorePassword = keyStorePassword;
+ }
+
+ public void setKeyStoreCertAlias(final String keyStoreCertAlias)
+ {
+ _keyStoreCertAlias = keyStoreCertAlias;
+ }
+
+ public void setTrustStorePath(final String trustStorePath)
+ {
+ _trustStorePath = trustStorePath;
+ }
+
+ public void setTrustStorePassword(final String trustStorePassword)
+ {
+ _trustStorePassword = trustStorePassword;
+ }
+
+ private static class ConnectionOptions
+ {
+ String username;
+ String password;
+ String clientId;
+ String remoteHost;
+
+ boolean binaryMessageId = true;
+ boolean syncPublish;
+ int maxSessions;
+ public boolean ssl;
+ public int maxPrefetch;
+ public String trustStorePath;
+ public String trustStorePassword;
+ public String keyStorePath;
+ public String keyStorePassword;
+ public String keyStoreCertAlias;
+ }
+
+
+
+ private static abstract class OptionSetter
+ {
+
+ private static final Map<String, OptionSetter> OPTION_SETTER_MAP = new HashMap<String, OptionSetter>();
+ private final String _name;
+ private final String _description;
+
+ public OptionSetter(String name, String description)
+ {
+ OPTION_SETTER_MAP.put(name.toLowerCase(), this);
+ _name = name;
+ _description = description;
+ }
+
+ public abstract void setOption(ConnectionOptions options, String value) throws MalformedURLException;
+
+ public static void parseOptions(URL url, ConnectionOptions options) throws MalformedURLException
+ {
+ String query = url.getQuery();
+ if(query != null)
+ {
+ for(String param : query.split("&"))
+ {
+
+ String[] keyValuePair = param.split("=",2);
+ OptionSetter setter = OPTION_SETTER_MAP.get(keyValuePair[0]);
+ if(setter != null)
+ {
+ setter.setOption(options, keyValuePair[1]);
+ }
+ else
+ {
+ throw new MalformedURLException("Unknown URL option: '"+keyValuePair[0]+"' in connection URL");
+ }
+
+ }
+ }
+ }
+ }
+
+ private static final OptionSetter[] _options =
+ {
+ new OptionSetter("clientid", "JMS client id / AMQP container id")
+ {
+ public void setOption(ConnectionOptions options, String value)
+ {
+ options.clientId = value;
+ }
+ },
+ new OptionSetter("ssl", "Set to \"true\" to use SSL encryption")
+ {
+ public void setOption(ConnectionOptions options, String value)
+ {
+ options.ssl = Boolean.valueOf(value);
+ }
+ },
+ new OptionSetter("remote-host", "AMQP remote host")
+ {
+ public void setOption(ConnectionOptions options, String value)
+ {
+ options.remoteHost = value;
+ }
+ },
+ new OptionSetter("binary-messageid", "Use binary (rather than String) message ids")
+ {
+ public void setOption(ConnectionOptions options, String value)
+ {
+ options.binaryMessageId = Boolean.parseBoolean(value);
+ }
+ },
+ new OptionSetter("sync-publish", "Wait for acknowledge when sending messages")
+ {
+ public void setOption(ConnectionOptions options, String value)
+ {
+ options.syncPublish = Boolean.parseBoolean(value);
+ }
+ },
+ new OptionSetter("max-sessions", "set maximum number of sessions allowed")
+ {
+ public void setOption(ConnectionOptions options, String value)
+ {
+ options.maxSessions = Integer.parseInt(value);
+ }
+ },
+ new OptionSetter("max-prefetch", "set maximum number of messages prefetched on a link")
+ {
+ public void setOption(ConnectionOptions options, String value)
+ {
+ options.maxPrefetch = Integer.parseInt(value);
+ }
+ },
+ new OptionSetter("trust-store","")
+ {
+ public void setOption(final ConnectionOptions options, final String value) throws MalformedURLException
+ {
+ options.trustStorePath = value;
+ }
+ },
+ new OptionSetter("trust-store-password","")
+ {
+ public void setOption(final ConnectionOptions options, final String value) throws MalformedURLException
+ {
+ options.trustStorePassword = value;
+ }
+ },
+ new OptionSetter("key-store","")
+ {
+ public void setOption(final ConnectionOptions options, final String value) throws MalformedURLException
+ {
+ options.keyStorePath = value;
+ }
+ },
+ new OptionSetter("key-store-password","")
+ {
+ public void setOption(final ConnectionOptions options, final String value) throws MalformedURLException
+ {
+ options.keyStorePassword = value;
+ }
+ },
+ new OptionSetter("ssl-cert-alias","")
+ {
+ public void setOption(final ConnectionOptions options, final String value) throws MalformedURLException
+ {
+ options.keyStoreCertAlias = value;
+ }
+ }
+ };
+
public static ConnectionFactoryImpl createFromURL(final String urlString) throws MalformedURLException
{
URL url = new URL(null, urlString, new URLStreamHandler()
- {
- @Override
- protected URLConnection openConnection(URL u) throws IOException
- {
- throw new UnsupportedOperationException();
- }
- });
+ {
+ @Override
+ protected URLConnection openConnection(URL u) throws IOException
+ {
+ throw new UnsupportedOperationException();
+ }
+ });
String protocol = url.getProtocol();
- if(protocol == null || "".equals(protocol))
+ if (protocol == null || "".equals(protocol))
{
protocol = "amqp";
}
- else if(!protocol.equals("amqp") && !protocol.equals("amqps"))
- {
- throw new MalformedURLException("Protocol '"+protocol+"' unknown. Must be one of 'amqp' or 'amqps'.");
- }
String host = url.getHost();
int port = url.getPort();
- boolean ssl = false;
+ final ConnectionOptions options = new ConnectionOptions();
- if(port == -1)
+ if (port == -1)
{
- if("amqps".equals(protocol))
+ if ("amqps".equals(protocol))
{
port = 5671;
- ssl = true;
+ options.ssl = true;
}
- else
+ else if("amqp".equals(protocol))
{
port = 5672;
}
+ else if("ws".equals(protocol))
+ {
+ port = 80;
+ }
+ else if("wss".equals(protocol))
+ {
+ port = 443;
+ }
}
- else if("amqps".equals(protocol))
+ else if ("amqps".equals(protocol) || "wss".equals(protocol))
{
- ssl = true;
+ options.ssl = true;
}
- String userInfo = url.getUserInfo();
- String username = null;
- String password = null;
- String clientId = null;
- String remoteHost = null;
- boolean binaryMessageId = true;
- boolean syncPublish = false;
- int maxSessions = 0;
+ String userInfo = url.getUserInfo();
- if(userInfo != null)
+ if (userInfo != null)
{
- String[] components = userInfo.split(":",2);
- username = URLDecoder.decode(components[0]);
- if(components.length == 2)
+ String[] components = userInfo.split(":", 2);
+ options.username = URLDecoder.decode(components[0]);
+ if (components.length == 2)
{
- password = URLDecoder.decode(components[1]);
- }
- }
- String query = url.getQuery();
- if(query != null)
- {
- for(String param : query.split("&"))
- {
- String[] keyValuePair = param.split("=",2);
- if(keyValuePair[0].equalsIgnoreCase("clientid"))
- {
- clientId = keyValuePair[1];
- }
- else if(keyValuePair[0].equalsIgnoreCase("ssl"))
- {
- ssl = Boolean.valueOf(keyValuePair[1]);
- }
- else if(keyValuePair[0].equalsIgnoreCase("remote-host"))
- {
- remoteHost = keyValuePair[1];
- }
- else if (keyValuePair[0].equalsIgnoreCase("binary-messageid"))
- {
- binaryMessageId = Boolean.parseBoolean(keyValuePair[1]);
- }
- else if (keyValuePair[0].equalsIgnoreCase("sync-publish"))
- {
- syncPublish = Boolean.parseBoolean(keyValuePair[1]);
- }
- else if(keyValuePair[0].equalsIgnoreCase("max-sessions"))
- {
- maxSessions = Integer.parseInt(keyValuePair[1]);
- }
+ options.password = URLDecoder.decode(components[1]);
}
}
- if(remoteHost == null)
+ OptionSetter.parseOptions(url, options);
+
+ if (options.remoteHost == null)
{
- remoteHost = host;
+ options.remoteHost = host;
}
ConnectionFactoryImpl connectionFactory =
- new ConnectionFactoryImpl(host, port, username, password, clientId, remoteHost, ssl, maxSessions);
- connectionFactory.setUseBinaryMessageId(binaryMessageId);
- connectionFactory.setSyncPublish(syncPublish);
+ new ConnectionFactoryImpl(protocol,
+ host,
+ port,
+ options.username,
+ options.password,
+ options.clientId,
+ options.remoteHost,
+ options.ssl,
+ options.maxSessions);
+ connectionFactory.setUseBinaryMessageId(options.binaryMessageId);
+ connectionFactory.setSyncPublish(options.syncPublish);
+ if (options.maxPrefetch != 0)
+ {
+ connectionFactory.setMaxPrefetch(options.maxPrefetch);
+ }
+ if (options.keyStorePath != null)
+ {
+ connectionFactory.setKeyStorePath(options.keyStorePath);
+ }
+ if (options.keyStorePassword != null)
+ {
+ connectionFactory.setKeyStorePassword(options.keyStorePassword);
+ }
+ if (options.keyStoreCertAlias != null)
+ {
+ connectionFactory.setKeyStoreCertAlias(options.keyStoreCertAlias);
+ }
+ if (options.trustStorePath != null)
+ {
+ connectionFactory.setTrustStorePath(options.trustStorePath);
+ }
+ if (options.trustStorePassword != null)
+ {
+ connectionFactory.setTrustStorePassword(options.trustStorePassword);
+ }
return connectionFactory;
@@ -287,4 +543,6 @@ public class ConnectionFactoryImpl imple
{
_syncPublish = syncPublish;
}
+
+
}
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java?rev=1558056&r1=1558055&r2=1558056&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/ConnectionImpl.java Tue Jan 14 14:46:35 2014
@@ -28,7 +28,9 @@ import org.apache.qpid.amqp_1_0.transpor
import javax.jms.*;
import javax.jms.IllegalStateException;
import javax.jms.Queue;
+import javax.net.ssl.SSLContext;
+import java.security.NoSuchAlgorithmException;
import java.util.*;
import org.apache.qpid.amqp_1_0.type.Symbol;
@@ -38,6 +40,8 @@ import org.apache.qpid.amqp_1_0.type.tra
public class ConnectionImpl implements Connection, QueueConnection, TopicConnection
{
+ private final String _protocol;
+ private final SSLContext _sslContext;
private ConnectionMetaData _connectionMetaData;
private volatile ExceptionListener _exceptionListener;
@@ -54,13 +58,18 @@ public class ConnectionImpl implements C
private final String _username;
private final String _password;
private String _remoteHost;
- private final boolean _ssl;
private String _clientId;
private String _queuePrefix;
private String _topicPrefix;
private boolean _useBinaryMessageId = Boolean.parseBoolean(System.getProperty("qpid.use_binary_message_id", "true"));
private boolean _syncPublish = Boolean.parseBoolean(System.getProperty("qpid.sync_publish", "false"));
private int _maxSessions;
+ private int _maxPrefetch;
+
+ public void setMaxPrefetch(final int maxPrefetch)
+ {
+ _maxPrefetch = maxPrefetch;
+ }
private static enum State
{
@@ -87,15 +96,50 @@ public class ConnectionImpl implements C
this(host, port, username, password, clientId, remoteHost, ssl,0);
}
+
public ConnectionImpl(String host, int port, String username, String password, String clientId, String remoteHost, boolean ssl, int maxSessions) throws JMSException
{
+ this(ssl?"amqps":"amqp",host,port,username,password,clientId,remoteHost,ssl,maxSessions);
+ }
+
+ public ConnectionImpl(String protocol, String host, int port, String username, String password, String clientId, String remoteHost, boolean ssl, int maxSessions) throws JMSException
+ {
+ this(protocol,
+ host,
+ port,
+ username,
+ password,
+ clientId,
+ remoteHost,
+ ssl ? getDefaultSSLContext() : null,
+ maxSessions);
+ }
+
+ private static SSLContext getDefaultSSLContext() throws JMSException
+ {
+ try
+ {
+ return SSLContext.getDefault();
+ }
+ catch (NoSuchAlgorithmException e)
+ {
+ JMSException jmsException = new JMSException(e.getMessage());
+ jmsException.setLinkedException(e);
+ jmsException.initCause(e);
+ throw jmsException;
+ }
+ }
+
+ public ConnectionImpl(String protocol, String host, int port, String username, String password, String clientId, String remoteHost, SSLContext sslContext, int maxSessions) throws JMSException
+ {
+ _protocol = protocol;
_host = host;
_port = port;
_username = username;
_password = password;
_clientId = clientId;
_remoteHost = remoteHost;
- _ssl = ssl;
+ _sslContext = sslContext;
_maxSessions = maxSessions;
}
@@ -109,11 +153,11 @@ public class ConnectionImpl implements C
_state = State.STOPPED;
Container container = _clientId == null ? new Container() : new Container(_clientId);
- // TODO - authentication, containerId, clientId, ssl?, etc
+
try
{
- _conn = new org.apache.qpid.amqp_1_0.client.Connection(_host,
- _port, _username, _password, container, _remoteHost, _ssl,
+ _conn = new org.apache.qpid.amqp_1_0.client.Connection(_protocol, _host,
+ _port, _username, _password, container, _remoteHost, _sslContext,
_maxSessions - 1);
_conn.setConnectionErrorTask(new ConnectionErrorTask());
// TODO - retrieve negotiated AMQP version
@@ -182,6 +226,10 @@ public class ConnectionImpl implements C
SessionImpl session = new SessionImpl(this, acknowledgeMode);
session.setQueueSession(_isQueueConnection);
session.setTopicSession(_isTopicConnection);
+ if(_maxPrefetch != 0)
+ {
+ session.setMaxPrefetch(_maxPrefetch);
+ }
boolean connectionStarted = false;
synchronized(_lock)
@@ -370,21 +418,47 @@ public class ConnectionImpl implements C
_lock.notifyAll();
}
-
+
+ List<JMSException> errors = new ArrayList<JMSException>();
+
if (sessions != null)
{
for(SessionImpl session : sessions)
{
- session.close();
+ try
+ {
+ session.close();
+ }
+ catch(JMSException e)
+ {
+ errors.add(e);
+ }
}
for(CloseTask task : closeTasks)
{
task.onClose();
}
- if(closeConnection) {
- _conn.close();
+ if(closeConnection)
+ {
+ try
+ {
+ _conn.close();
+ }
+ catch (ConnectionErrorException e)
+ {
+ final JMSException jmsException = new JMSException("Error while closing connection: " + e.getMessage());
+ jmsException.setLinkedException(e);
+ throw jmsException;
+ }
}
}
+
+ if(!errors.isEmpty())
+ {
+ final JMSException jmsException = new JMSException("Error while closing connection: " + errors.get(0).getMessage());
+ jmsException.setLinkedException(errors.get(0));
+ throw jmsException;
+ }
}
private void checkClosed() throws IllegalStateException
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java?rev=1558056&r1=1558055&r2=1558056&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java Tue Jan 14 14:46:35 2014
@@ -76,6 +76,7 @@ public class MessageConsumerImpl impleme
private Binary _lastTxnUpdate;
private final List<Message> _recoverReplayMessages = new ArrayList<Message>();
private final List<Message> _replaymessages = new ArrayList<Message>();
+ private int _maxPrefetch = 100;
MessageConsumerImpl(final Destination destination,
final SessionImpl session,
@@ -117,6 +118,10 @@ public class MessageConsumerImpl impleme
throw new InvalidDestinationException("Invalid destination class " + destination.getClass().getName());
}
_session = session;
+ if(session.getMaxPrefetch() != 0)
+ {
+ _maxPrefetch = session.getMaxPrefetch();
+ }
_receiver = createClientReceiver();
_receiver.setRemoteErrorListener(new Runnable()
@@ -442,7 +447,7 @@ public class MessageConsumerImpl impleme
public void start()
{
- _receiver.setCredit(UnsignedInteger.valueOf(100), true);
+ _receiver.setCredit(UnsignedInteger.valueOf(getMaxPrefetch()), true);
}
public Queue getQueue() throws JMSException
@@ -487,4 +492,14 @@ public class MessageConsumerImpl impleme
}
}
}
+
+ public int getMaxPrefetch()
+ {
+ return _maxPrefetch;
+ }
+
+ public void setMaxPrefetch(final int maxPrefetch)
+ {
+ _maxPrefetch = maxPrefetch;
+ }
}
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageProducerImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageProducerImpl.java?rev=1558056&r1=1558055&r2=1558056&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageProducerImpl.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageProducerImpl.java Tue Jan 14 14:46:35 2014
@@ -34,6 +34,8 @@ import javax.jms.*;
import javax.jms.IllegalStateException;
import javax.jms.Message;
import java.util.UUID;
+import java.util.concurrent.TimeoutException;
+
import org.apache.qpid.amqp_1_0.type.messaging.Accepted;
import org.apache.qpid.amqp_1_0.type.messaging.Rejected;
import org.apache.qpid.amqp_1_0.type.messaging.Source;
@@ -221,7 +223,7 @@ public class MessageProducerImpl impleme
}
catch (Sender.SenderClosingException e)
{
- final JMSException jmsException = new JMSException("error closing");
+ final JMSException jmsException = new JMSException("Error closing producer: " + e.getMessage());
jmsException.setLinkedException(e);
throw jmsException;
}
@@ -299,8 +301,8 @@ public class MessageProducerImpl impleme
final org.apache.qpid.amqp_1_0.client.Message clientMessage = new org.apache.qpid.amqp_1_0.client.Message(msg.getSections());
DispositionAction action = null;
-
- if(_syncPublish)
+ final boolean doSync = _syncPublish || (deliveryMode == DeliveryMode.PERSISTENT && _session.getTxn() == null);
+ if(doSync)
{
action = new DispositionAction(_sender);
}
@@ -315,8 +317,14 @@ public class MessageProducerImpl impleme
jmsException.setLinkedException(e);
throw jmsException;
}
+ catch (TimeoutException e)
+ {
+ JMSException jmsException = new JMSException("Timed out while waiting to get credit to send");
+ jmsException.setLinkedException(e);
+ throw jmsException;
+ }
- if(_syncPublish && !action.wasAccepted(_syncPublishTimeout))
+ if(doSync && !action.wasAccepted(_syncPublishTimeout))
{
if (action.getOutcome() instanceof Rejected)
{
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java?rev=1558056&r1=1558055&r2=1558056&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java Tue Jan 14 14:46:35 2014
@@ -81,6 +81,7 @@ public class SessionImpl implements Sess
private boolean _isQueueSession;
private boolean _isTopicSession;
private Transaction _txn;
+ private int _maxPrefetch;
protected SessionImpl(final ConnectionImpl connection, final AcknowledgeMode acknowledgeMode) throws JMSException
{
@@ -523,6 +524,13 @@ public class SessionImpl implements Sess
messageConsumer = new TopicSubscriberImpl(name, true, (org.apache.qpid.amqp_1_0.jms.Topic) topic, this,
selector,
noLocal);
+
+ if(_dispatcherThread == null)
+ {
+ _dispatcherThread = new Thread(_dispatcher);
+ _dispatcherThread.start();
+ }
+
addConsumer(messageConsumer);
if(_connection.isStarted())
{
@@ -836,6 +844,16 @@ public class SessionImpl implements Sess
return _txn;
}
+ public void setMaxPrefetch(final int maxPrefetch)
+ {
+ _maxPrefetch = maxPrefetch;
+ }
+
+ public int getMaxPrefetch()
+ {
+ return _maxPrefetch;
+ }
+
private class Dispatcher implements Runnable
{
Propchange: qpid/branches/java-broker-bdb-ha/qpid/java/amqp-1-0-client-websocket/
------------------------------------------------------------------------------
--- svn:ignore (added)
+++ svn:ignore Tue Jan 14 14:46:35 2014
@@ -0,0 +1 @@
+*.iml
Propchange: qpid/branches/java-broker-bdb-ha/qpid/java/amqp-1-0-client-websocket/resources/
------------------------------------------------------------------------------
Merged /qpid/branches/0.5.x-dev/qpid/java/amqp-1-0-client-websocket/resources:r886720-886722,887145,892761,894875,916304,916325,930288,931179
Merged /qpid/trunk/qpid/amqp-1-0-client-websocket/resources:r796646-796653
Merged /qpid/branches/java-broker-vhost-refactor/java/amqp-1-0-client-websocket/resources:r1493674-1494547
Merged /qpid/branches/qpid-2935/qpid/java/amqp-1-0-client-websocket/resources:r1061302-1072333
Merged /qpid/branches/java-network-refactor/qpid/java/amqp-1-0-client-websocket/resources:r805429-821809
Merged /qpid/branches/0.5.x-dev/amqp-1-0-client-websocket/resources:r886720-886722
Merged /qpid/branches/java-broker-0-10/qpid/java/amqp-1-0-client-websocket/resources:r795950-829653
Merged /qpid/trunk/qpid/java/amqp-1-0-client-websocket/resources:r1556873-1558036
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Respond.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Respond.java?rev=1558056&r1=1558055&r2=1558056&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Respond.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Respond.java Tue Jan 14 14:46:35 2014
@@ -27,6 +27,8 @@ import java.util.List;
import java.util.ListIterator;
import java.util.Map;
import java.util.Random;
+import java.util.concurrent.TimeoutException;
+
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Options;
import org.apache.qpid.amqp_1_0.type.Section;
@@ -280,7 +282,8 @@ public class Respond extends Util
}
}
- private void respond(Message m) throws Sender.SenderCreationException, ConnectionClosedException, LinkDetachedException
+ private void respond(Message m)
+ throws Sender.SenderCreationException, ConnectionClosedException, LinkDetachedException, TimeoutException
{
List<Section> sections = m.getPayload();
String replyTo = null;
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/amqp-1-0-client/pom.xml
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/amqp-1-0-client/pom.xml?rev=1558056&r1=1558055&r2=1558056&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/amqp-1-0-client/pom.xml (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/amqp-1-0-client/pom.xml Tue Jan 14 14:46:35 2014
@@ -35,6 +35,17 @@
</dependencies>
<build>
+ <resources>
+ <resource>
+ <directory>src/main/resources</directory>
+ </resource>
+ <resource>
+ <directory>src/main/java</directory>
+ <includes>
+ <include>resources/</include>
+ </includes>
+ </resource>
+ </resources>
</build>
</project>
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Connection.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Connection.java?rev=1558056&r1=1558055&r2=1558056&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Connection.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Connection.java Tue Jan 14 14:46:35 2014
@@ -20,33 +20,27 @@
*/
package org.apache.qpid.amqp_1_0.client;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.net.Socket;
-import java.nio.ByteBuffer;
+import java.security.NoSuchAlgorithmException;
import java.security.Principal;
-import java.util.logging.Level;
-import java.util.logging.Logger;
+import java.util.ServiceLoader;
+import java.util.concurrent.TimeoutException;
-import javax.net.ssl.SSLSocketFactory;
-
-import org.apache.qpid.amqp_1_0.framing.SocketExceptionHandler;
+import org.apache.qpid.amqp_1_0.framing.ExceptionHandler;
import org.apache.qpid.amqp_1_0.framing.ConnectionHandler;
-import org.apache.qpid.amqp_1_0.transport.AMQPTransport;
import org.apache.qpid.amqp_1_0.transport.ConnectionEndpoint;
import org.apache.qpid.amqp_1_0.transport.Container;
-import org.apache.qpid.amqp_1_0.transport.StateChangeListener;
-import org.apache.qpid.amqp_1_0.type.Binary;
+import org.apache.qpid.amqp_1_0.transport.Predicate;
import org.apache.qpid.amqp_1_0.type.FrameBody;
import org.apache.qpid.amqp_1_0.type.SaslFrameBody;
import org.apache.qpid.amqp_1_0.type.UnsignedInteger;
+import org.apache.qpid.amqp_1_0.type.transport.AmqpError;
import org.apache.qpid.amqp_1_0.type.transport.ConnectionError;
import org.apache.qpid.amqp_1_0.type.transport.Error;
-public class Connection implements SocketExceptionHandler
+import javax.net.ssl.SSLContext;
+
+public class Connection implements ExceptionHandler
{
- private static final Logger RAW_LOGGER = Logger.getLogger("RAW");
private static final int MAX_FRAME_SIZE = 65536;
private String _address;
@@ -145,6 +139,20 @@ public class Connection implements Socke
}
+ public Connection(final String protocol,
+ final String address,
+ final int port,
+ final String username,
+ final String password,
+ final Container container,
+ final String remoteHost,
+ final SSLContext sslContext,
+ final int channelMax) throws ConnectionException
+ {
+ this(protocol, address, port, username, password, MAX_FRAME_SIZE,container,remoteHost,sslContext,
+ channelMax);
+ }
+
public Connection(final String address,
final int port,
final String username,
@@ -155,188 +163,127 @@ public class Connection implements Socke
boolean ssl,
int channelMax) throws ConnectionException
{
+ this(ssl?"amqp":"amqps",address,port,username,password,maxFrameSize,container,remoteHostname,getSslContext(ssl),channelMax);
+ }
- _address = address;
-
+ private static SSLContext getSslContext(final boolean ssl) throws ConnectionException
+ {
try
{
- final Socket s;
- if(ssl)
- {
- s = SSLSocketFactory.getDefault().createSocket(address, port);
- }
- else
- {
- s = new Socket(address, port);
- }
-
-
- Principal principal = username == null ? null : new Principal()
- {
-
- public String getName()
- {
- return username;
- }
- };
- _conn = new ConnectionEndpoint(container, principal, password);
- if(channelMax >= 0)
- {
- _conn.setChannelMax((short)channelMax);
- }
- _conn.setDesiredMaxFrameSize(UnsignedInteger.valueOf(maxFrameSize));
- _conn.setRemoteAddress(s.getRemoteSocketAddress());
- _conn.setRemoteHostname(remoteHostname);
-
-
-
- ConnectionHandler.FrameOutput<FrameBody> out = new ConnectionHandler.FrameOutput<FrameBody>(_conn);
-
+ return ssl ? SSLContext.getDefault() : null;
+ }
+ catch (NoSuchAlgorithmException e)
+ {
+ throw new ConnectionException(e);
+ }
+ }
- final OutputStream outputStream = s.getOutputStream();
+ public Connection(final String protocol,
+ final String address,
+ final int port,
+ final String username,
+ final String password,
+ final int maxFrameSize,
+ final Container container,
+ final String remoteHostname,
+ SSLContext sslContext,
+ int channelMax) throws ConnectionException
+ {
- ConnectionHandler.BytesSource src;
+ _address = address;
- if(_conn.requiresSASL())
- {
- ConnectionHandler.FrameOutput<SaslFrameBody> saslOut = new ConnectionHandler.FrameOutput<SaslFrameBody>(_conn);
- src = new ConnectionHandler.SequentialBytesSource(new ConnectionHandler.HeaderBytesSource(_conn, (byte)'A',
- (byte)'M',
- (byte)'Q',
- (byte)'P',
- (byte)3,
- (byte)1,
- (byte)0,
- (byte)0),
- new ConnectionHandler.FrameToBytesSourceAdapter(saslOut,_conn.getDescribedTypeRegistry()),
- new ConnectionHandler.HeaderBytesSource(_conn, (byte)'A',
- (byte)'M',
- (byte)'Q',
- (byte)'P',
- (byte)0,
- (byte)1,
- (byte)0,
- (byte)0),
- new ConnectionHandler.FrameToBytesSourceAdapter(out,_conn.getDescribedTypeRegistry())
- );
+ Principal principal = username == null ? null : new Principal()
+ {
- _conn.setSaslFrameOutput(saslOut);
- }
- else
+ public String getName()
{
- src = new ConnectionHandler.SequentialBytesSource(new ConnectionHandler.HeaderBytesSource(_conn,(byte)'A',
- (byte)'M',
- (byte)'Q',
- (byte)'P',
- (byte)0,
- (byte)1,
- (byte)0,
- (byte)0),
- new ConnectionHandler.FrameToBytesSourceAdapter(out,_conn.getDescribedTypeRegistry())
- );
+ return username;
}
+ };
+ _conn = new ConnectionEndpoint(container, principal, password);
+ if(channelMax >= 0)
+ {
+ _conn.setChannelMax((short)channelMax);
+ }
+ _conn.setDesiredMaxFrameSize(UnsignedInteger.valueOf(maxFrameSize));
+ _conn.setRemoteHostname(remoteHostname);
+ ConnectionHandler.FrameOutput<FrameBody> out = new ConnectionHandler.FrameOutput<FrameBody>(_conn);
- ConnectionHandler.BytesOutputHandler outputHandler = new ConnectionHandler.BytesOutputHandler(outputStream, src, _conn, this);
- Thread outputThread = new Thread(outputHandler);
- outputThread.setDaemon(true);
- outputThread.start();
- _conn.setFrameOutputHandler(out);
-
-
-
- final ConnectionHandler handler = new ConnectionHandler(_conn);
- final InputStream inputStream = s.getInputStream();
-
- Thread inputThread = new Thread(new Runnable()
- {
-
- public void run()
- {
- try
- {
- doRead(handler, inputStream);
- }
- finally
- {
- if(_conn.closedForInput() && _conn.closedForOutput())
- {
- try
- {
- synchronized (outputStream)
- {
- s.close();
- }
- }
- catch (IOException e)
- {
- e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
- }
- }
- }
- }
- });
+ ConnectionHandler.BytesSource src;
- inputThread.setDaemon(true);
- inputThread.start();
+ if(_conn.requiresSASL())
+ {
+ ConnectionHandler.FrameOutput<SaslFrameBody> saslOut = new ConnectionHandler.FrameOutput<SaslFrameBody>(_conn);
- _conn.open();
+ src = new ConnectionHandler.SequentialBytesSource(new ConnectionHandler.HeaderBytesSource(_conn, (byte)'A',
+ (byte)'M',
+ (byte)'Q',
+ (byte)'P',
+ (byte)3,
+ (byte)1,
+ (byte)0,
+ (byte)0),
+ new ConnectionHandler.FrameToBytesSourceAdapter(saslOut,_conn.getDescribedTypeRegistry()),
+ new ConnectionHandler.HeaderBytesSource(_conn, (byte)'A',
+ (byte)'M',
+ (byte)'Q',
+ (byte)'P',
+ (byte)0,
+ (byte)1,
+ (byte)0,
+ (byte)0),
+ new ConnectionHandler.FrameToBytesSourceAdapter(out,_conn.getDescribedTypeRegistry())
+ );
+ _conn.setSaslFrameOutput(saslOut);
}
- catch (IOException e)
+ else
{
- throw new ConnectionException(e);
+ src = new ConnectionHandler.SequentialBytesSource(new ConnectionHandler.HeaderBytesSource(_conn,(byte)'A',
+ (byte)'M',
+ (byte)'Q',
+ (byte)'P',
+ (byte)0,
+ (byte)1,
+ (byte)0,
+ (byte)0),
+ new ConnectionHandler.FrameToBytesSourceAdapter(out,_conn.getDescribedTypeRegistry())
+ );
}
+ TransportProvider transportProvider = getTransportProvider(protocol);
- }
+ transportProvider.connect(_conn,address,port, sslContext, this);
- private Connection(ConnectionEndpoint endpoint)
- {
- _conn = endpoint;
- }
+ _conn.open();
- private void doRead(final AMQPTransport transport, final InputStream inputStream)
- {
- byte[] buf = new byte[2<<15];
- ByteBuffer bbuf = ByteBuffer.wrap(buf);
- final Object lock = new Object();
- transport.setInputStateChangeListener(new StateChangeListener(){
+ }
- public void onStateChange(final boolean active)
- {
- synchronized(lock)
- {
- lock.notifyAll();
- }
- }
- });
+ private TransportProvider getTransportProvider(final String protocol) throws ConnectionException
+ {
+ ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
+ ServiceLoader<TransportProviderFactory> providerFactories = ServiceLoader.load(TransportProviderFactory.class, classLoader);
- try
+ for(TransportProviderFactory tpf : providerFactories)
{
- int read;
- while((read = inputStream.read(buf)) != -1)
+ if(tpf.getSupportedTransports().contains(protocol))
{
- bbuf.position(0);
- bbuf.limit(read);
-
- while(bbuf.hasRemaining() && transport.isOpenForInput())
- {
- transport.processBytes(bbuf);
- }
-
-
+ return tpf.getProvider(protocol);
}
}
- catch (IOException e)
- {
- e.printStackTrace();
- }
+ throw new ConnectionException("Unknown protocol: " + protocol);
}
+ private Connection(ConnectionEndpoint endpoint)
+ {
+ _conn = endpoint;
+ }
+
+
public Session createSession() throws ConnectionException
{
checkNotClosed();
@@ -357,84 +304,47 @@ public class Connection implements Socke
return _conn;
}
- public void awaitOpen()
+ public void awaitOpen() throws TimeoutException, InterruptedException
{
- synchronized(getEndpoint().getLock())
+ getEndpoint().waitUntil(new Predicate()
{
- while(!getEndpoint().isOpen() && !getEndpoint().isClosed())
+ @Override
+ public boolean isSatisfied()
{
- try
- {
- getEndpoint().getLock().wait();
- }
- catch (InterruptedException e)
- {
- e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
- }
+ return getEndpoint().isOpen() || getEndpoint().isClosed();
}
- }
+ });
}
- private void doRead(final ConnectionHandler handler, final InputStream inputStream)
+ public void close() throws ConnectionErrorException
{
- byte[] buf = new byte[2<<15];
-
+ _conn.close();
try
{
- int read;
- boolean done = false;
- while(!handler.isDone() && (read = inputStream.read(buf)) != -1)
+ _conn.waitUntil(new Predicate()
{
- ByteBuffer bbuf = ByteBuffer.wrap(buf, 0, read);
- Binary b = new Binary(buf,0,read);
-
- if(RAW_LOGGER.isLoggable(Level.FINE))
- {
- RAW_LOGGER.fine("RECV [" + _conn.getRemoteAddress() + "] : " + b.toString());
- }
- while(bbuf.hasRemaining() && !handler.isDone())
+ @Override
+ public boolean isSatisfied()
{
- handler.parse(bbuf);
+ return _conn.closedForInput();
}
-
-
- }
- if(!handler.isDone())
- {
- _conn.inputClosed();
- if(_conn.getConnectionEventListener() != null)
- {
- _conn.getConnectionEventListener().closeReceived();
- }
- }
+ });
}
- catch (IOException e)
+ catch (InterruptedException e)
{
- _conn.inputClosed();
- e.printStackTrace();
+ throw new ConnectionErrorException(AmqpError.INTERNAL_ERROR, "Interrupted while waiting for connection closure");
}
- }
-
- public void close()
- {
- _conn.close();
-
- synchronized (_conn.getLock())
+ catch (TimeoutException e)
{
- while(!_conn.closedForInput())
- {
- try
- {
- _conn.getLock().wait();
- }
- catch (InterruptedException e)
- {
-
- }
- }
+ throw new ConnectionErrorException(AmqpError.INTERNAL_ERROR, "Timed out while waiting for connection closure");
}
+ if(_conn.getRemoteError() != null)
+ {
+ throw new ConnectionErrorException(_conn.getRemoteError());
+ }
+
}
/**
@@ -458,7 +368,7 @@ public class Connection implements Socke
}
@Override
- public void processSocketException(Exception exception)
+ public void handleException(Exception exception)
{
Error socketError = new Error();
socketError.setDescription(exception.getClass() + ": " + exception.getMessage());
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/ConnectionErrorException.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/ConnectionErrorException.java?rev=1558056&r1=1558055&r2=1558056&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/ConnectionErrorException.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/ConnectionErrorException.java Tue Jan 14 14:46:35 2014
@@ -20,15 +20,21 @@
*/
package org.apache.qpid.amqp_1_0.client;
+import org.apache.qpid.amqp_1_0.type.ErrorCondition;
import org.apache.qpid.amqp_1_0.type.transport.Error;
public class ConnectionErrorException extends ConnectionException
{
protected final Error _remoteError;
+ public ConnectionErrorException(ErrorCondition condition,final String description)
+ {
+ this(new Error(condition, description));
+ }
+
public ConnectionErrorException(Error remoteError)
{
- super();
+ super(remoteError.getDescription());
_remoteError = remoteError;
}
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receiver.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receiver.java?rev=1558056&r1=1558055&r2=1558056&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receiver.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receiver.java Tue Jan 14 14:46:35 2014
@@ -23,6 +23,7 @@ package org.apache.qpid.amqp_1_0.client;
import org.apache.qpid.amqp_1_0.messaging.SectionDecoder;
import org.apache.qpid.amqp_1_0.transport.DeliveryStateHandler;
import org.apache.qpid.amqp_1_0.transport.LinkEndpoint;
+import org.apache.qpid.amqp_1_0.transport.Predicate;
import org.apache.qpid.amqp_1_0.transport.ReceivingLinkEndpoint;
import org.apache.qpid.amqp_1_0.transport.ReceivingLinkListener;
@@ -38,6 +39,7 @@ import org.apache.qpid.amqp_1_0.type.tra
import java.nio.ByteBuffer;
import java.util.*;
import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.TimeoutException;
public class Receiver implements DeliveryStateHandler
{
@@ -137,36 +139,47 @@ public class Receiver implements Deliver
_endpoint.setLocalUnsettled(unsettled);
_endpoint.attach();
- synchronized(_endpoint.getLock())
+ try
{
- while(!_endpoint.isAttached() && !_endpoint.isDetached())
+ _endpoint.waitUntil(new Predicate()
{
- try
- {
- _endpoint.getLock().wait();
- }
- catch (InterruptedException e)
+
+ @Override
+ public boolean isSatisfied()
{
- e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ return _endpoint.isAttached() || _endpoint.isDetached();
}
- }
+ });
+ }
+ catch (TimeoutException e)
+ {
+ throw new ConnectionErrorException(AmqpError.INTERNAL_ERROR,"Timeout waiting for attach");
+ }
+ catch (InterruptedException e)
+ {
+ throw new ConnectionErrorException(AmqpError.INTERNAL_ERROR,"Interrupted while waiting for attach");
}
if(_endpoint.getSource() == null)
{
- synchronized(_endpoint.getLock())
+ try
{
- while(!_endpoint.isDetached())
+ _endpoint.waitUntil(new Predicate()
{
- try
- {
- _endpoint.getLock().wait();
- }
- catch (InterruptedException e)
+ @Override
+ public boolean isSatisfied()
{
- e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ return _endpoint.isDetached();
}
- }
+ });
+ }
+ catch (TimeoutException e)
+ {
+ throw new ConnectionErrorException(AmqpError.INTERNAL_ERROR,"Timeout waiting for detach following failed attach");
+ }
+ catch (InterruptedException e)
+ {
+ throw new ConnectionErrorException(AmqpError.INTERNAL_ERROR,"Interrupted whil waiting for detach following failed attach");
}
throw new ConnectionErrorException(getError());
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org