You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ta...@apache.org on 2017/02/24 22:36:37 UTC
[3/3] qpid-jms git commit: QPIDJMS-267 Failover discovery via
connection properties
QPIDJMS-267 Failover discovery via connection properties
Add support for discovering new failover URIs bsaed on redirect
style Maps in a list provided in the Open frame of a remote peer.
Allow for addition, replacement or ignore of those discovered
hosts by the failover transport and provide an event point for
connection listeners to see the discovered resources.
Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/6295f7e6
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/6295f7e6
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/6295f7e6
Branch: refs/heads/master
Commit: 6295f7e6b8fd3ef6ddf4e2cb28d81fd493956282
Parents: 1b8f246
Author: Timothy Bish <ta...@gmail.com>
Authored: Fri Feb 24 17:21:03 2017 -0500
Committer: Timothy Bish <ta...@gmail.com>
Committed: Fri Feb 24 17:27:11 2017 -0500
----------------------------------------------------------------------
.../java/org/apache/qpid/jms/JmsConnection.java | 21 +
.../apache/qpid/jms/JmsConnectionListener.java | 9 +
.../jms/provider/DefaultProviderListener.java | 5 +
.../qpid/jms/provider/ProviderFactory.java | 26 +-
.../qpid/jms/provider/ProviderListener.java | 12 +
.../provider/ProviderRedirectedException.java | 49 +-
.../qpid/jms/provider/ProviderWrapper.java | 6 +
.../jms/provider/amqp/AmqpAbstractResource.java | 2 +-
.../qpid/jms/provider/amqp/AmqpConnection.java | 2 +-
.../provider/amqp/AmqpConnectionProperties.java | 35 +-
.../jms/provider/amqp/AmqpFixedProducer.java | 2 +-
.../qpid/jms/provider/amqp/AmqpProvider.java | 52 +-
.../jms/provider/amqp/AmqpProviderFactory.java | 41 +-
.../qpid/jms/provider/amqp/AmqpRedirect.java | 217 ++++
.../qpid/jms/provider/amqp/AmqpSupport.java | 43 +-
.../amqp/AmqpTransactionCoordinator.java | 3 +-
.../amqp/builders/AmqpConnectionBuilder.java | 21 +
.../amqp/builders/AmqpResourceBuilder.java | 2 +-
.../jms/provider/failover/FailoverProvider.java | 79 +-
.../jms/provider/failover/FailoverUriPool.java | 57 +-
.../qpid/jms/transports/TransportFactory.java | 7 +
.../netty/NettySslTransportFactory.java | 5 +
.../netty/NettyWssTransportFactory.java | 5 +
.../org/apache/qpid/jms/util/PropertyUtil.java | 9 +-
.../org/apache/qpid/jms/util/URISupport.java | 22 +-
.../services/org/apache/qpid/jms/provider/amqp | 3 +-
.../services/org/apache/qpid/jms/provider/amqps | 3 +-
.../org/apache/qpid/jms/provider/amqpws | 3 +-
.../org/apache/qpid/jms/provider/amqpwss | 3 +-
.../org/apache/qpid/jms/provider/redirects/ws | 19 +
.../org/apache/qpid/jms/provider/redirects/wss | 19 +
.../qpid/jms/JmsDefaultConnectionListener.java | 5 +
.../integration/ConnectionIntegrationTest.java | 9 +-
.../FailedConnectionsIntegrationTest.java | 12 +-
.../jms/provider/amqp/AmqpProviderTest.java | 58 +-
.../qpid/jms/provider/amqp/AmqpSupportTest.java | 52 +-
.../provider/failover/FailoverProviderTest.java | 27 +
.../provider/failover/FailoverRedirectTest.java | 37 +-
.../provider/failover/FailoverUriPoolTest.java | 40 +
...qpOpenProvidedServerListIntegrationTest.java | 1142 ++++++++++++++++++
.../qpid/jms/test/testpeer/TestAmqpPeer.java | 7 +-
.../jms/test/testpeer/TestAmqpPeerRunner.java | 4 +
.../netty/NettyTcpToMockServerTest.java | 4 +-
.../apache/qpid/jms/util/URISupportTest.java | 19 +-
qpid-jms-docs/Configuration.md | 2 +
.../JmsConsumerPriorityDispatchTest.java | 9 +
.../jms/discovery/FileWatcherDiscoveryTest.java | 5 +
.../jms/discovery/JmsAmqpDiscoveryTest.java | 5 +
.../transactions/JmsTransactedConsumerTest.java | 4 +
49 files changed, 2019 insertions(+), 204 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/6295f7e6/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
index 3c0f918..8294570 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
@@ -18,6 +18,7 @@ package org.apache.qpid.jms;
import java.io.IOException;
import java.net.URI;
+import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
@@ -1369,6 +1370,26 @@ public class JmsConnection implements AutoCloseable, Connection, TopicConnection
onAsyncException(cause);
}
+ @Override
+ public void onRemoteDiscovery(final List<URI> remotes) {
+ for (URI remote : remotes) {
+ LOG.trace("Discovered new remote at: {}", remote);
+ }
+
+ // Give listeners a chance to know what we've discovered.
+ if (!connectionListeners.isEmpty()) {
+ for (final JmsConnectionListener listener : connectionListeners) {
+ executor.submit(new Runnable() {
+
+ @Override
+ public void run() {
+ listener.onRemoteDiscovery(remotes);
+ }
+ });
+ }
+ }
+ }
+
/**
* Handles any asynchronous errors that occur from the JMS framework classes.
*
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/6295f7e6/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionListener.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionListener.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionListener.java
index f9389f4..b25a2b1 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionListener.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionListener.java
@@ -17,6 +17,7 @@
package org.apache.qpid.jms;
import java.net.URI;
+import java.util.List;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
@@ -104,4 +105,12 @@ public interface JmsConnectionListener {
*/
void onProducerClosed(MessageProducer producer, Throwable cause);
+ /**
+ * Called when additional remote peers are discovered by this connection.
+ *
+ * @param remotes
+ * A list of remote peers that have been discovered.
+ */
+ void onRemoteDiscovery(List<URI> remotes);
+
}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/6295f7e6/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/DefaultProviderListener.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/DefaultProviderListener.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/DefaultProviderListener.java
index 1d98e01..6fe2334 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/DefaultProviderListener.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/DefaultProviderListener.java
@@ -18,6 +18,7 @@ package org.apache.qpid.jms.provider;
import java.io.IOException;
import java.net.URI;
+import java.util.List;
import org.apache.qpid.jms.message.JmsInboundMessageDispatch;
import org.apache.qpid.jms.message.JmsOutboundMessageDispatch;
@@ -71,4 +72,8 @@ public class DefaultProviderListener implements ProviderListener {
@Override
public void onProviderException(Exception cause) {
}
+
+ @Override
+ public void onRemoteDiscovery(List<URI> remotes) {
+ }
}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/6295f7e6/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderFactory.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderFactory.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderFactory.java
index c18b336..19a41db 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderFactory.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderFactory.java
@@ -48,7 +48,7 @@ public abstract class ProviderFactory {
public abstract Provider createProvider(URI remoteURI) throws Exception;
/**
- * @return the name of this JMS Provider.
+ * @return the name of this Provider.
*/
public abstract String getName();
@@ -92,8 +92,28 @@ public abstract class ProviderFactory {
* @throws IOException if an error occurs while locating the factory.
*/
public static ProviderFactory findProviderFactory(URI location) throws IOException {
- String scheme = location.getScheme();
- if (scheme == null) {
+ if (location == null) {
+ throw new IOException("No Provider location specified.");
+ }
+
+ return findProviderFactory(location.getScheme());
+ }
+
+ /**
+ * Searches for a ProviderFactory by using the scheme given.
+ *
+ * The search first checks the local cache of provider factories before moving on
+ * to search in the class path.
+ *
+ * @param scheme
+ * The URI scheme that describes the desired factory.
+ *
+ * @return a provider factory instance matching the scheme.
+ *
+ * @throws IOException if an error occurs while locating the factory.
+ */
+ public static ProviderFactory findProviderFactory(String scheme) throws IOException {
+ if (scheme == null || scheme.isEmpty()) {
throw new IOException("No Provider scheme specified.");
}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/6295f7e6/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderListener.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderListener.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderListener.java
index a96de4f..412242e 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderListener.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderListener.java
@@ -18,6 +18,7 @@ package org.apache.qpid.jms.provider;
import java.io.IOException;
import java.net.URI;
+import java.util.List;
import org.apache.qpid.jms.message.JmsInboundMessageDispatch;
import org.apache.qpid.jms.message.JmsOutboundMessageDispatch;
@@ -155,4 +156,15 @@ public interface ProviderListener {
*/
void onProviderException(Exception cause);
+ /**
+ * Called when additional remote peers are discovered.
+ * <p>
+ * If new peers are discovered their URIs are provided to listeners to allow for
+ * failover or update of client connection information.
+ *
+ * @param remotes
+ * A list of remote peers that have been discovered.
+ */
+ void onRemoteDiscovery(List<URI> remotes);
+
}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/6295f7e6/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderRedirectedException.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderRedirectedException.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderRedirectedException.java
index ff997a8..3ab6459 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderRedirectedException.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderRedirectedException.java
@@ -17,6 +17,7 @@
package org.apache.qpid.jms.provider;
import java.io.IOException;
+import java.net.URI;
/**
* {@link IOException} derivative that defines that the remote peer has requested that this
@@ -26,54 +27,18 @@ public class ProviderRedirectedException extends IOException {
private static final long serialVersionUID = 5872211116061710369L;
- private final String hostname;
- private final String networkHost;
- private final String scheme;
- private final String path;
- private final int port;
+ private final URI redirect;
- public ProviderRedirectedException(String reason, String scheme, String hostname, String networkHost, int port, String path) {
+ public ProviderRedirectedException(String reason, URI redirect) {
super(reason);
- this.scheme = scheme;
- this.hostname = hostname;
- this.networkHost = networkHost;
- this.port = port;
- this.path = path;
+ this.redirect = redirect;
}
/**
- * @return the host name of the container being redirected to.
+ * @return the URI that represents the redirection.
*/
- public String getHostname() {
- return hostname;
- }
-
- /**
- * @return the DNS host name or IP address of the peer this connection is being redirected to.
- */
- public String getNetworkHost() {
- return networkHost;
- }
-
- /**
- * @return the port number on the peer this connection is being redirected to.
- */
- public int getPort() {
- return port;
- }
-
- /**
- * @return the scheme that the remote indicated the redirect connection should use.
- */
- public String getScheme() {
- return scheme;
- }
-
- /**
- * @return the path that the remote indicated should be path of the redirect URI.
- */
- public String getPath() {
- return path;
+ public URI getRedirectionURI() {
+ return redirect;
}
}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/6295f7e6/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderWrapper.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderWrapper.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderWrapper.java
index 74bfa16..2e3efc8 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderWrapper.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/ProviderWrapper.java
@@ -18,6 +18,7 @@ package org.apache.qpid.jms.provider;
import java.io.IOException;
import java.net.URI;
+import java.util.List;
import javax.jms.JMSException;
@@ -204,6 +205,11 @@ public class ProviderWrapper<E extends Provider> implements Provider, ProviderLi
listener.onProviderException(cause);
}
+ @Override
+ public void onRemoteDiscovery(List<URI> remotes) {
+ listener.onRemoteDiscovery(remotes);
+ }
+
/**
* @return the wrapped Provider.
*/
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/6295f7e6/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAbstractResource.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAbstractResource.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAbstractResource.java
index 3a8cf3d..11a14a0 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAbstractResource.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAbstractResource.java
@@ -259,7 +259,7 @@ public abstract class AmqpAbstractResource<R extends JmsResource, E extends Endp
if (isAwaitingClose()) {
closeResource(provider, null, true); // Close was expected so ignore any endpoint errors.
} else {
- closeResource(provider, AmqpSupport.convertToException(getEndpoint(), getEndpoint().getRemoteCondition()), true);
+ closeResource(provider, AmqpSupport.convertToException(provider, getEndpoint(), getEndpoint().getRemoteCondition()), true);
}
}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/6295f7e6/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java
index 48271bf..c93107b 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java
@@ -66,7 +66,7 @@ public class AmqpConnection extends AmqpAbstractResource<JmsConnectionInfo, Conn
this.amqpMessageFactory = new AmqpJmsMessageFactory(this);
// Create connection properties initialized with defaults from the JmsConnectionInfo
- this.properties = new AmqpConnectionProperties(info);
+ this.properties = new AmqpConnectionProperties(info, provider);
}
public void createSession(JmsSessionInfo sessionInfo, AsyncResult request) {
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/6295f7e6/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnectionProperties.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnectionProperties.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnectionProperties.java
index 815104a..99f0063 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnectionProperties.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnectionProperties.java
@@ -19,10 +19,12 @@ package org.apache.qpid.jms.provider.amqp;
import static org.apache.qpid.jms.provider.amqp.AmqpSupport.ANONYMOUS_RELAY;
import static org.apache.qpid.jms.provider.amqp.AmqpSupport.CONNECTION_OPEN_FAILED;
import static org.apache.qpid.jms.provider.amqp.AmqpSupport.DELAYED_DELIVERY;
+import static org.apache.qpid.jms.provider.amqp.AmqpSupport.FAILOVER_SERVER_LIST;
import static org.apache.qpid.jms.provider.amqp.AmqpSupport.QUEUE_PREFIX;
import static org.apache.qpid.jms.provider.amqp.AmqpSupport.SHARED_SUBS;
import static org.apache.qpid.jms.provider.amqp.AmqpSupport.TOPIC_PREFIX;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
@@ -42,11 +44,13 @@ public class AmqpConnectionProperties {
private static final Logger LOG = LoggerFactory.getLogger(AmqpConnectionProperties.class);
private final JmsConnectionInfo connectionInfo;
+ private final AmqpProvider provider;
private boolean delayedDeliverySupported = false;
private boolean anonymousRelaySupported = false;
private boolean sharedSubsSupported = false;
private boolean connectionOpenFailed = false;
+ private final List<AmqpRedirect> failoverServerList = new ArrayList<>();
/**
* Creates a new instance of this class with default values read from the
@@ -54,9 +58,12 @@ public class AmqpConnectionProperties {
*
* @param connectionInfo
* the JmsConnectionInfo object used to populate defaults.
+ * @param provider
+ * the provider instance associated with this object
*/
- public AmqpConnectionProperties(JmsConnectionInfo connectionInfo) {
+ public AmqpConnectionProperties(JmsConnectionInfo connectionInfo, AmqpProvider provider) {
this.connectionInfo = connectionInfo;
+ this.provider = provider;
}
/**
@@ -92,6 +99,7 @@ public class AmqpConnectionProperties {
}
}
+ @SuppressWarnings("unchecked")
protected void processProperties(Map<Symbol, Object> properties) {
if (properties.containsKey(QUEUE_PREFIX)) {
Object o = properties.get(QUEUE_PREFIX);
@@ -113,6 +121,31 @@ public class AmqpConnectionProperties {
LOG.trace("Remote sent Connection Establishment Failed marker.");
connectionOpenFailed = true;
}
+
+ if (properties.containsKey(FAILOVER_SERVER_LIST)) {
+ LOG.trace("Remote sent Failover Server List.");
+ Object o = properties.get(FAILOVER_SERVER_LIST);
+ if (o instanceof List) {
+ for (Map<Symbol, Object> redirection : (List<Map<Symbol, Object>>) o) {
+ try {
+ failoverServerList.add(new AmqpRedirect(redirection, provider).validate());
+ } catch (Exception ex) {
+ LOG.debug("Invalid redirection value given in failover server list: {}", ex.getMessage());
+ }
+ }
+
+ LOG.trace("Failover Server List: {}", failoverServerList);
+ }
+ }
+ }
+
+ /**
+ * Get any advertised failover server list details.
+ *
+ * @return return the advertised failover server list details, list is empty if no server list given.
+ */
+ public List<AmqpRedirect> getFailoverServerList() {
+ return failoverServerList;
}
/**
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/6295f7e6/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java
index 9b5bc71..4acd419 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java
@@ -237,7 +237,7 @@ public class AmqpFixedProducer extends AmqpProducer {
remoteError = getEndpoint().getRemoteCondition();
}
- deliveryError = AmqpSupport.convertToException(getEndpoint(), remoteError);
+ deliveryError = AmqpSupport.convertToException(getParent().getProvider(), getEndpoint(), remoteError);
} else if (outcome instanceof Released) {
LOG.trace("Outcome of delivery was released: {}", delivery);
deliveryError = new JMSException("Delivery failed: released by receiver");
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/6295f7e6/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
index 1c54ca6..d5d5ade 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
@@ -19,6 +19,7 @@ package org.apache.qpid.jms.provider.amqp;
import java.io.IOException;
import java.net.URI;
import java.nio.ByteBuffer;
+import java.util.List;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
@@ -55,7 +56,7 @@ import org.apache.qpid.jms.provider.ProviderFuture;
import org.apache.qpid.jms.provider.ProviderListener;
import org.apache.qpid.jms.provider.amqp.builders.AmqpClosedConnectionBuilder;
import org.apache.qpid.jms.provider.amqp.builders.AmqpConnectionBuilder;
-import org.apache.qpid.jms.transports.TransportFactory;
+import org.apache.qpid.jms.transports.Transport;
import org.apache.qpid.jms.transports.TransportListener;
import org.apache.qpid.jms.util.IOExceptionSupport;
import org.apache.qpid.jms.util.ThreadPoolUtils;
@@ -66,7 +67,6 @@ import org.apache.qpid.proton.engine.EndpointState;
import org.apache.qpid.proton.engine.Event;
import org.apache.qpid.proton.engine.Event.Type;
import org.apache.qpid.proton.engine.Sasl;
-import org.apache.qpid.proton.engine.Transport;
import org.apache.qpid.proton.engine.impl.CollectorImpl;
import org.apache.qpid.proton.engine.impl.ProtocolTracer;
import org.apache.qpid.proton.engine.impl.TransportImpl;
@@ -106,8 +106,7 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP
private volatile ProviderListener listener;
private AmqpConnection connection;
private AmqpSaslAuthenticator authenticator;
- private volatile org.apache.qpid.jms.transports.Transport transport;
- private String transportType = AmqpProviderFactory.DEFAULT_TRANSPORT_TYPE;
+ private final Transport transport;
private String vhost;
private boolean traceFrames;
private boolean traceBytes;
@@ -117,13 +116,16 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP
private int channelMax = DEFAULT_CHANNEL_MAX;
private int idleTimeout = 60000;
private int drainTimeout = 60000;
- private long sessionOutoingWindow = -1; //Use proton default
+ private long sessionOutoingWindow = -1; // Use proton default
private int maxFrameSize = DEFAULT_MAX_FRAME_SIZE;
+ private boolean allowNonSecureRedirects;
+
private final URI remoteURI;
private final AtomicBoolean closed = new AtomicBoolean();
private ScheduledThreadPoolExecutor serializer;
- private final Transport protonTransport = Transport.Factory.create();
+ private final org.apache.qpid.proton.engine.Transport protonTransport =
+ org.apache.qpid.proton.engine.Transport.Factory.create();
private final Collector protonCollector = new CollectorImpl();
private final Connection protonConnection = Connection.Factory.create();
@@ -135,9 +137,12 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP
*
* @param remoteURI
* The URI of the AMQP broker this Provider instance will connect to.
+ * @param transport
+ * The underlying Transport that will be used for wire level communications.
*/
- public AmqpProvider(URI remoteURI) {
+ public AmqpProvider(URI remoteURI, Transport transport) {
this.remoteURI = remoteURI;
+ this.transport = transport;
serializer = new ScheduledThreadPoolExecutor(1, new ThreadFactory() {
@@ -184,7 +189,6 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP
SSLContext sslContextOverride = connectionInfo.getSslContextOverride();
- transport = TransportFactory.create(getTransportType(), getRemoteURI());
transport.setTransportListener(AmqpProvider.this);
transport.connect(sslContextOverride);
@@ -1023,6 +1027,13 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP
}
}
+ public void fireRemotesDiscovered(List<URI> remotes) {
+ ProviderListener listener = this.listener;
+ if (listener != null) {
+ listener.onRemoteDiscovery(remotes);
+ }
+ }
+
@Override
public void addChildResource(AmqpResource resource) {
if (resource instanceof AmqpConnection) {
@@ -1166,6 +1177,21 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP
this.sessionOutoingWindow = sessionOutoingWindow;
}
+ public boolean isAllowNonSecureRedirects() {
+ return allowNonSecureRedirects;
+ }
+
+ /**
+ * Should the AMQP connection allow a redirect or failover server update that redirects
+ * from a secure connection to an non-secure one (SSL to TCP).
+ *
+ * @param allowNonSecureRedirects
+ * the allowNonSecureRedirects value to apply to this AMQP connection.
+ */
+ public void setAllowNonSecureRedirects(boolean allowNonSecureRedirects) {
+ this.allowNonSecureRedirects = allowNonSecureRedirects;
+ }
+
public long getCloseTimeout() {
return connectionInfo != null ? connectionInfo.getCloseTimeout() : JmsConnectionInfo.DEFAULT_CLOSE_TIMEOUT;
}
@@ -1195,12 +1221,8 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP
this.channelMax = channelMax;
}
- String getTransportType() {
- return transportType;
- }
-
- void setTransportType(String transportType) {
- this.transportType = transportType;
+ public Transport getTransport() {
+ return transport;
}
@Override
@@ -1218,7 +1240,7 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP
return remoteURI;
}
- public Transport getProtonTransport() {
+ public org.apache.qpid.proton.engine.Transport getProtonTransport() {
return protonTransport;
}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/6295f7e6/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProviderFactory.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProviderFactory.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProviderFactory.java
index 5f81c6f..5675277 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProviderFactory.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProviderFactory.java
@@ -19,8 +19,9 @@ package org.apache.qpid.jms.provider.amqp;
import java.net.URI;
import java.util.Map;
-import org.apache.qpid.jms.provider.Provider;
import org.apache.qpid.jms.provider.ProviderFactory;
+import org.apache.qpid.jms.transports.Transport;
+import org.apache.qpid.jms.transports.TransportFactory;
import org.apache.qpid.jms.util.PropertyUtil;
/**
@@ -28,21 +29,22 @@ import org.apache.qpid.jms.util.PropertyUtil;
*/
public class AmqpProviderFactory extends ProviderFactory {
- public static final String DEFAULT_TRANSPORT_TYPE = "tcp";
+ public static final String DEFAULT_TRANSPORT_SCHEME = "tcp";
+ public static final String DEFAULT_PROVIDER_SCHEME = "amqp";
- private String transportType = DEFAULT_TRANSPORT_TYPE;
+ private String transportScheme = DEFAULT_TRANSPORT_SCHEME;
+ private String providerScheme = DEFAULT_PROVIDER_SCHEME;
@Override
- public Provider createProvider(URI remoteURI) throws Exception {
+ public AmqpProvider createProvider(URI remoteURI) throws Exception {
Map<String, String> map = PropertyUtil.parseQuery(remoteURI.getQuery());
Map<String, String> providerOptions = PropertyUtil.filterProperties(map, "amqp.");
- remoteURI = PropertyUtil.replaceQuery(remoteURI, map);
+ // Clear off any amqp.X values from the transport before creation.
+ Transport transport = TransportFactory.create(getTransportScheme(), PropertyUtil.replaceQuery(remoteURI, map));
- AmqpProvider result = new AmqpProvider(remoteURI);
-
- result.setTransportType(getTransportType());
+ AmqpProvider result = new AmqpProvider(remoteURI, transport);
Map<String, String> unused = PropertyUtil.setProperties(result, providerOptions);
if (!unused.isEmpty()) {
@@ -62,18 +64,27 @@ public class AmqpProviderFactory extends ProviderFactory {
return "AMQP";
}
+ public String getTransportScheme() {
+ return transportScheme;
+ }
+
/**
- * @return the transport type used for this provider factory such as 'tcp' or 'ssl'
+ * @param transportScheme
+ * the transport type name to use when creating a new provider.
*/
- public String getTransportType() {
- return transportType;
+ public void setTransportScheme(String transportScheme) {
+ this.transportScheme = transportScheme;
+ }
+
+ public String getProviderScheme() {
+ return providerScheme;
}
/**
- * @param transportType
- * the transport type name to use when creating a new provider.
+ * @param providerScheme
+ * the providerScheme to use to identify the AMQP provider
*/
- public void setTransportType(String transportType) {
- this.transportType = transportType;
+ public void setProviderScheme(String providerScheme) {
+ this.providerScheme = providerScheme;
}
}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/6295f7e6/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpRedirect.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpRedirect.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpRedirect.java
new file mode 100644
index 0000000..2e2588a
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpRedirect.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.provider.amqp;
+
+import static org.apache.qpid.jms.provider.amqp.AmqpSupport.NETWORK_HOST;
+import static org.apache.qpid.jms.provider.amqp.AmqpSupport.OPEN_HOSTNAME;
+import static org.apache.qpid.jms.provider.amqp.AmqpSupport.PATH;
+import static org.apache.qpid.jms.provider.amqp.AmqpSupport.PORT;
+import static org.apache.qpid.jms.provider.amqp.AmqpSupport.SCHEME;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+import org.apache.qpid.jms.provider.ProviderFactory;
+import org.apache.qpid.jms.transports.TransportFactory;
+import org.apache.qpid.jms.util.FactoryFinder;
+import org.apache.qpid.jms.util.PropertyUtil;
+import org.apache.qpid.jms.util.URISupport;
+import org.apache.qpid.proton.amqp.Symbol;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Encapsulates the AMQP Redirect Map
+ */
+public class AmqpRedirect {
+
+ private static final Logger LOG = LoggerFactory.getLogger(AmqpRedirect.class);
+
+ private static final FactoryFinder<ProviderFactory> PROVIDER_FACTORY_FINDER =
+ new FactoryFinder<ProviderFactory>(ProviderFactory.class,
+ "META-INF/services/" + ProviderFactory.class.getPackage().getName().replace(".", "/") + "/redirects/");
+
+ private final Map<Symbol, Object> redirect;
+ private final AmqpProvider provider;
+
+ public AmqpRedirect(Map<Symbol, Object> redirect, AmqpProvider provider) {
+ this.redirect = redirect;
+ this.provider = provider;
+
+ if (provider == null) {
+ throw new IllegalArgumentException("A provider instance is required");
+ }
+
+ URI remoteURI = provider.getRemoteURI();
+ if (remoteURI == null || remoteURI.getScheme() == null || remoteURI.getScheme().isEmpty()) {
+ throw new IllegalArgumentException("The provider instance must provide a valid scheme");
+ }
+ }
+
+ public AmqpRedirect validate() throws Exception {
+ String networkHost = (String) redirect.get(NETWORK_HOST);
+ if (networkHost == null || networkHost.isEmpty()) {
+ throw new IOException("Redirection information not set, missing network host.");
+ }
+
+ try {
+ Integer.parseInt(redirect.get(PORT).toString());
+ } catch (Exception ex) {
+ throw new IOException("Redirection information contained invalid port.");
+ }
+
+ String sourceScheme = provider.getRemoteURI().getScheme();
+ String scheme = (String) redirect.get(SCHEME);
+ if (scheme != null && !scheme.isEmpty() && !scheme.equals(sourceScheme)) {
+
+ // Attempt to located a provider using normal scheme (amqp, amqps, etc...)
+ ProviderFactory factory = null;
+ try {
+ factory = ProviderFactory.findProviderFactory(scheme);
+ } catch (Throwable error) {
+ LOG.trace("Couldn't find AMQP prefixed Provider using scheme: {}", scheme);
+ }
+
+ if (factory == null) {
+ // Attempt to located a transport level redirect (ws, wss, etc...)
+ try {
+ factory = findProviderFactoryByTransportScheme(scheme);
+ } catch (Throwable error) {
+ LOG.trace("Couldn't find Provider using transport scheme: {}", scheme);
+ }
+ }
+
+ if (factory == null || !(factory instanceof AmqpProviderFactory)) {
+ throw new IOException("Redirect contained an unknown provider scheme: " + scheme);
+ }
+
+ LOG.trace("Found provider: {} for redirect: {}", factory.getName(), scheme);
+
+ AmqpProviderFactory amqpFactory = (AmqpProviderFactory) factory;
+ String transportType = amqpFactory.getTransportScheme();
+
+ if (transportType == null || transportType.isEmpty()) {
+ throw new IOException("Redirect contained an unknown provider scheme: " + scheme);
+ }
+
+ TransportFactory transportFactory = TransportFactory.findTransportFactory(transportType);
+ if (transportFactory == null) {
+ throw new IOException("Redirect contained an unknown provider scheme: " + scheme);
+ }
+
+ // Check for insecure redirect and whether it is allowed.
+ if (provider.getTransport().isSecure() && !transportFactory.isSecure() && !provider.isAllowNonSecureRedirects()) {
+ throw new IOException("Attempt to redirect to an insecure connection type: " + transportType);
+ }
+
+ // Update the redirect information with the resolved target scheme used to create
+ // the provider for the redirection.
+ redirect.put(SCHEME, amqpFactory.getProviderScheme());
+ }
+
+ // Check it actually converts to URI since we require it do so later
+ toURI();
+
+ return this;
+ }
+
+ /**
+ * @return the redirection map that backs this object
+ */
+ public Map<Symbol, Object> getRedirectMap() {
+ return redirect;
+ }
+
+ /**
+ * @return the host name of the container being redirected to.
+ */
+ public String getHostname() {
+ return (String) redirect.get(OPEN_HOSTNAME);
+ }
+
+ /**
+ * @return the DNS host name or IP address of the peer this connection is being redirected to.
+ */
+ public String getNetworkHost() {
+ return (String) redirect.get(NETWORK_HOST);
+ }
+
+ /**
+ * @return the port number on the peer this connection is being redirected to.
+ */
+ public int getPort() {
+ return Integer.parseInt(redirect.get(PORT).toString());
+ }
+
+ /**
+ * @return the scheme that the remote indicated the redirect connection should use.
+ */
+ public String getScheme() {
+ String scheme = (String) redirect.get(SCHEME);
+ if (scheme == null || scheme.isEmpty()) {
+ scheme = provider.getRemoteURI().getScheme();
+ }
+
+ return scheme;
+ }
+
+ /**
+ * @return the path that the remote indicated should be path of the redirect URI.
+ */
+ public String getPath() {
+ return (String) redirect.get(PATH);
+ }
+
+ /**
+ * Construct a URI from the redirection information available.
+ *
+ * @return a URI that matches the redirection information provided.
+ *
+ * @throws Exception if an error occurs construct a URI from the redirection information.
+ */
+ public URI toURI() throws Exception {
+ Map<String, String> queryOptions = PropertyUtil.parseQuery(provider.getRemoteURI());
+
+ URI result = new URI(getScheme(), null, getNetworkHost(), getPort(), getPath(), null, null);
+
+ String hostname = getHostname();
+ if (hostname != null && !hostname.isEmpty()) {
+ // Ensure we replace any existing vhost option with the redirect version.
+ queryOptions = new LinkedHashMap<>(queryOptions);
+ queryOptions.put("amqp.vhost", hostname);
+ }
+
+ return URISupport.applyParameters(result, queryOptions);
+ }
+
+ private static ProviderFactory findProviderFactoryByTransportScheme(String scheme) throws IOException {
+ if (scheme == null || scheme.isEmpty()) {
+ throw new IOException("No Transport scheme specified.");
+ }
+
+ ProviderFactory factory = null;
+ try {
+ factory = PROVIDER_FACTORY_FINDER.newInstance(scheme);
+ } catch (Throwable e) {
+ throw new IOException("Provider NOT found using redirect scheme: [" + scheme + "]", e);
+ }
+
+ return factory;
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/6295f7e6/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSupport.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSupport.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSupport.java
index 7af1de4..62c5001 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSupport.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSupport.java
@@ -51,6 +51,9 @@ public class AmqpSupport {
public static final Symbol INVALID_FIELD = Symbol.valueOf("invalid-field");
public static final Symbol CONTAINER_ID = Symbol.valueOf("container-id");
+ // Symbols used to announce failover server list (in addition to redirect symbols below)
+ public static final Symbol FAILOVER_SERVER_LIST = Symbol.valueOf("failover-server-list");
+
// Symbols used to announce connection redirect ErrorCondition 'info'
public static final Symbol PATH = Symbol.valueOf("path");
public static final Symbol SCHEME = Symbol.valueOf("scheme");
@@ -101,6 +104,8 @@ public class AmqpSupport {
* Given an ErrorCondition instance create a new Exception that best matches
* the error type.
*
+ * @param provider
+ * the AMQP provider instance that originates this exception
* @param endpoint
* The target of the error.
* @param errorCondition
@@ -108,14 +113,16 @@ public class AmqpSupport {
*
* @return a new Exception instance that best matches the ErrorCondition value.
*/
- public static Exception convertToException(Endpoint endpoint, ErrorCondition errorCondition) {
- return convertToException(endpoint, errorCondition, null);
+ public static Exception convertToException(AmqpProvider provider, Endpoint endpoint, ErrorCondition errorCondition) {
+ return convertToException(provider, endpoint, errorCondition, null);
}
/**
* Given an ErrorCondition instance create a new Exception that best matches
* the error type.
*
+ * @param provider
+ * the AMQP provider instance that originates this exception
* @param endpoint
* The target of the error.
* @param errorCondition
@@ -125,7 +132,7 @@ public class AmqpSupport {
*
* @return a new Exception instance that best matches the ErrorCondition value.
*/
- public static Exception convertToException(Endpoint endpoint, ErrorCondition errorCondition, Exception defaultException) {
+ public static Exception convertToException(AmqpProvider provider, Endpoint endpoint, ErrorCondition errorCondition, Exception defaultException) {
Exception remoteError = defaultException;
if (errorCondition != null && errorCondition.getCondition() != null) {
@@ -145,7 +152,7 @@ public class AmqpSupport {
} else if (error.equals(TransactionErrors.TRANSACTION_ROLLBACK)) {
remoteError = new TransactionRolledBackException(message);
} else if (error.equals(ConnectionError.REDIRECT)) {
- remoteError = createRedirectException(error, message, errorCondition);
+ remoteError = createRedirectException(provider, error, message, errorCondition);
} else if (error.equals(AmqpError.INVALID_FIELD)) {
Map<?, ?> info = errorCondition.getInfo();
if (info != null && CONTAINER_ID.equals(info.get(INVALID_FIELD))) {
@@ -192,6 +199,8 @@ public class AmqpSupport {
* When a redirect type exception is received this method is called to create the
* appropriate redirect exception type containing the error details needed.
*
+ * @param provider
+ * the AMQP provider instance that originates this exception
* @param error
* the Symbol that defines the redirection error type.
* @param message
@@ -201,32 +210,20 @@ public class AmqpSupport {
*
* @return an Exception that captures the details of the redirection error.
*/
- public static Exception createRedirectException(Symbol error, String message, ErrorCondition condition) {
+ public static Exception createRedirectException(AmqpProvider provider, Symbol error, String message, ErrorCondition condition) {
Exception result = null;
Map<?, ?> info = condition.getInfo();
if (info == null) {
result = new IOException(message + " : Redirection information not set.");
} else {
- String hostname = (String) info.get(OPEN_HOSTNAME);
- String path = (String) info.get(PATH);
- String scheme = (String) info.get(SCHEME);
-
- String networkHost = (String) info.get(NETWORK_HOST);
- int port = 0;
-
- if (networkHost == null || networkHost.isEmpty()) {
- result = new IOException(message + " : Redirection information not set.");
- } else {
- try {
- port = Integer.parseInt(info.get(PORT).toString());
- } catch (Exception ex) {
- result = new IOException(message + " : Redirection information not set.");
- }
- }
+ @SuppressWarnings("unchecked")
+ AmqpRedirect redirect = new AmqpRedirect((Map<Symbol, Object>) info, provider);
- if (result == null) {
- result = new ProviderRedirectedException(message, scheme, hostname, networkHost, port, path);
+ try {
+ result = new ProviderRedirectedException(message, redirect.validate().toURI());
+ } catch (Exception ex) {
+ result = new IOException(message + " : " + ex.getMessage());
}
}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/6295f7e6/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionCoordinator.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionCoordinator.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionCoordinator.java
index f567b09..f6a57cb 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionCoordinator.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionCoordinator.java
@@ -80,7 +80,8 @@ public class AmqpTransactionCoordinator extends AmqpAbstractResource<JmsSessionI
} else if (state instanceof Rejected) {
LOG.debug("Last TX request failed: {}", txId);
Rejected rejected = (Rejected) state;
- Exception cause = AmqpSupport.convertToException(getEndpoint(), rejected.getError());
+ Exception cause = AmqpSupport.convertToException(
+ getParent().getProvider(), getEndpoint(), rejected.getError());
JMSException failureCause = null;
if (txId.getProviderContext().equals(COMMIT_MARKER)) {
failureCause = new TransactionRolledBackException(cause.getMessage());
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/6295f7e6/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpConnectionBuilder.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpConnectionBuilder.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpConnectionBuilder.java
index 5792c09..98cabfc 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpConnectionBuilder.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpConnectionBuilder.java
@@ -18,7 +18,10 @@ package org.apache.qpid.jms.provider.amqp.builders;
import static org.apache.qpid.jms.provider.amqp.AmqpSupport.SOLE_CONNECTION_CAPABILITY;
+import java.net.URI;
+import java.util.ArrayList;
import java.util.LinkedHashMap;
+import java.util.List;
import java.util.Map;
import javax.jms.Session;
@@ -28,6 +31,7 @@ import org.apache.qpid.jms.meta.JmsSessionInfo;
import org.apache.qpid.jms.provider.AsyncResult;
import org.apache.qpid.jms.provider.amqp.AmqpConnection;
import org.apache.qpid.jms.provider.amqp.AmqpProvider;
+import org.apache.qpid.jms.provider.amqp.AmqpRedirect;
import org.apache.qpid.jms.provider.amqp.AmqpSupport;
import org.apache.qpid.jms.util.MetaDataSupport;
import org.apache.qpid.proton.amqp.Symbol;
@@ -129,6 +133,23 @@ public class AmqpConnectionBuilder extends AmqpResourceBuilder<AmqpConnection, A
// be determined, this allows us to check for close pending.
getResource().getProperties().initialize(
getEndpoint().getRemoteOfferedCapabilities(), getEndpoint().getRemoteProperties());
+
+ // If there are failover servers in the open then we signal that to the listeners
+ List<AmqpRedirect> failoverList = getResource().getProperties().getFailoverServerList();
+ if (!failoverList.isEmpty()) {
+ List<URI> failoverURIs = new ArrayList<>();
+ for (AmqpRedirect redirect : failoverList) {
+ try {
+ failoverURIs.add(redirect.toURI());
+ } catch (Exception ex) {
+ LOG.trace("Error while creating URI from failover server: {}", redirect);
+ }
+ }
+
+ if (!failoverURIs.isEmpty()) {
+ getResource().getProvider().fireRemotesDiscovered(failoverURIs);
+ }
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/6295f7e6/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpResourceBuilder.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpResourceBuilder.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpResourceBuilder.java
index e984e7b..69d5f04 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpResourceBuilder.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpResourceBuilder.java
@@ -169,7 +169,7 @@ public abstract class AmqpResourceBuilder<TARGET extends AmqpResource, PARENT ex
Throwable openError;
if (hasRemoteError()) {
- openError = AmqpSupport.convertToException(getEndpoint(), getEndpoint().getRemoteCondition());
+ openError = AmqpSupport.convertToException(parent.getProvider(), getEndpoint(), getEndpoint().getRemoteCondition());
} else if (cause != null) {
openError = cause;
} else {
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/6295f7e6/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProvider.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProvider.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProvider.java
index ec3b1fb..45d6087 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProvider.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProvider.java
@@ -22,6 +22,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.LinkedHashMap;
import java.util.List;
+import java.util.Locale;
import java.util.Map;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
@@ -122,6 +123,8 @@ public class FailoverProvider extends DefaultProviderListener implements Provide
private int startupMaxReconnectAttempts = DEFAULT_STARTUP_MAX_RECONNECT_ATTEMPTS;
private int warnAfterReconnectAttempts = DEFAULT_WARN_AFTER_RECONNECT_ATTEMPTS;
+ private FailoverServerListBehaviour amqpOpenServerListBehaviour = FailoverServerListBehaviour.REPLACE;
+
public FailoverProvider(Map<String, String> nestedOptions) {
this(null, nestedOptions);
}
@@ -556,7 +559,7 @@ public class FailoverProvider extends DefaultProviderListener implements Provide
if (cause instanceof ProviderRedirectedException) {
ProviderRedirectedException redirect = (ProviderRedirectedException) cause;
try {
- uris.addFirst(buildRedirectURI(failedURI, redirect));
+ uris.addFirst(redirect.getRedirectionURI());
} catch (Exception error) {
LOG.warn("Could not construct redirection URI from remote provided information");
}
@@ -785,17 +788,6 @@ public class FailoverProvider extends DefaultProviderListener implements Provide
}
}
- protected URI buildRedirectURI(URI sourceURI, ProviderRedirectedException redirect) throws Exception {
- String scheme = sourceURI.getScheme();
- String host = redirect.getNetworkHost();
- String path = redirect.getPath();
- int port = redirect.getPort();
-
- URI result = new URI(scheme, null, host, port, path, null, null);
-
- return result;
- }
-
//--------------- DefaultProviderListener overrides ----------------------//
@Override
@@ -857,6 +849,57 @@ public class FailoverProvider extends DefaultProviderListener implements Provide
});
}
+ @Override
+ public void onRemoteDiscovery(final List<URI> discovered) {
+ if (closingConnection.get() || closed.get() || failed.get()) {
+ return;
+ }
+
+ if (discovered == null || discovered.isEmpty()) {
+ return;
+ }
+
+ serializer.execute(new Runnable() {
+ @Override
+ public void run() {
+ if (!closingConnection.get() && !closed.get() && !failed.get()) {
+
+ List<URI> newRemotes = new ArrayList<URI>(discovered);
+ switch (amqpOpenServerListBehaviour) {
+ case ADD:
+ try {
+ uris.addAll(discovered);
+ } catch (Throwable err) {
+ LOG.warn("Error while attempting to add discovered URIs: {}", discovered);
+ }
+ break;
+ case REPLACE:
+ // The current server is assumed not to be in the list of updated remote
+ // as it is meant for the failover nodes. The pool will de-dup if it is.
+ newRemotes.add(0, connectedURI);
+ try {
+ uris.replaceAll(newRemotes);
+ } catch (Throwable err) {
+ LOG.warn("Error while attempting to add discovered URIs: {}", discovered);
+ }
+ break;
+ case IGNORE:
+ // Do Nothing
+ break;
+ default:
+ // Shouldnt get here, but do nothing if we do.
+ break;
+ }
+
+ // Inform any listener that we've made a new discovery.
+ if (listener != null) {
+ listener.onRemoteDiscovery(discovered);
+ }
+ }
+ }
+ });
+ }
+
//--------------- URI update and rebalance methods -----------------------//
public void add(final URI uri) {
@@ -999,6 +1042,14 @@ public class FailoverProvider extends DefaultProviderListener implements Provide
return this.requestTimeout;
}
+ public String getAmqpOpenServerListBehaviour() {
+ return amqpOpenServerListBehaviour.toString();
+ }
+
+ public void setAmqpOpenServerListBehaviour(String amqpOpenServerListBehaviour) {
+ this.amqpOpenServerListBehaviour = FailoverServerListBehaviour.valueOf(amqpOpenServerListBehaviour.toUpperCase(Locale.ENGLISH));
+ }
+
public Map<String, String> getNestedOptions() {
return uris.getNestedOptions();
}
@@ -1216,4 +1267,8 @@ public class FailoverProvider extends DefaultProviderListener implements Provide
super.onSuccess();
}
}
+
+ private static enum FailoverServerListBehaviour {
+ ADD, REPLACE, IGNORE
+ };
}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/6295f7e6/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverUriPool.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverUriPool.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverUriPool.java
index c01bf06..3740da6 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverUriPool.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverUriPool.java
@@ -20,6 +20,7 @@ import java.io.IOException;
import java.net.InetAddress;
import java.net.URI;
import java.net.URISyntaxException;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
@@ -167,6 +168,25 @@ public class FailoverUriPool {
/**
* Adds a new URI to the pool if not already contained within. The URI will have
+ * any nest options that have been configured added to its existing set of options.
+ *
+ * @param uris
+ * The new list of URIs to add to the pool.
+ */
+ public void addAll(List<URI> uris) {
+ if (uris == null || uris.isEmpty()) {
+ return;
+ }
+
+ synchronized (uris) {
+ for (URI uri : uris) {
+ add(uri);
+ }
+ }
+ }
+
+ /**
+ * Adds a new URI to the pool if not already contained within. The URI will have
* any nested options that have been configured added to its existing set of options.
*
* The URI is added to the head of the pooled URIs and will be the next value that
@@ -220,6 +240,41 @@ public class FailoverUriPool {
}
/**
+ * Removes all currently configured URIs from the pool, no new URIs will be
+ * served from this pool until new ones are added.
+ */
+ public void removeAll() {
+ synchronized (uris) {
+ uris.clear();
+ }
+ }
+
+ /**
+ * Removes all currently configured URIs from the pool and replaces them with
+ * the new set given.
+ *
+ * @param replacements
+ * The new set of failover URIs to serve from this pool.
+ */
+ public void replaceAll(List<URI> replacements) {
+ synchronized (uris) {
+ uris.clear();
+ addAll(replacements);
+ }
+ }
+
+ /**
+ * Gets the current list of URIs. The returned list is a copy.
+ *
+ * @return a copy of the current list of URIs in the pool.
+ */
+ public List<URI> getList() {
+ synchronized (uris) {
+ return new ArrayList<>(uris);
+ }
+ }
+
+ /**
* Returns the currently set value for nested options which will be added to each
* URI that is returned from the pool.
*
@@ -259,7 +314,7 @@ public class FailoverUriPool {
if (firstAddr.equals(secondAddr)) {
result = true;
}
- } catch(IOException e) {
+ } catch (IOException e) {
if (firstAddr == null) {
LOG.error("Failed to Lookup INetAddress for URI[ " + first + " ] : " + e);
} else {
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/6295f7e6/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/TransportFactory.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/TransportFactory.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/TransportFactory.java
index fcb05a5..d3202e7 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/TransportFactory.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/TransportFactory.java
@@ -100,6 +100,13 @@ public abstract class TransportFactory {
public abstract String getName();
/**
+ * @return true if the Transport that this factory provides uses a secure channel.
+ */
+ public boolean isSecure() {
+ return false;
+ }
+
+ /**
* Static create method that performs the TransportFactory search and handles the
* configuration and setup.
*
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/6295f7e6/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettySslTransportFactory.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettySslTransportFactory.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettySslTransportFactory.java
index 59d7061..628983a 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettySslTransportFactory.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettySslTransportFactory.java
@@ -33,4 +33,9 @@ public class NettySslTransportFactory extends NettyTcpTransportFactory {
public String getName() {
return "SSL";
}
+
+ @Override
+ public boolean isSecure() {
+ return true;
+ }
}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/6295f7e6/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettyWssTransportFactory.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettyWssTransportFactory.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettyWssTransportFactory.java
index 71e396c..ef594c6 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettyWssTransportFactory.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/transports/netty/NettyWssTransportFactory.java
@@ -33,4 +33,9 @@ public class NettyWssTransportFactory extends NettyWsTransportFactory {
public String getName() {
return "WSS";
}
+
+ @Override
+ public boolean isSecure() {
+ return true;
+ }
}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/6295f7e6/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/PropertyUtil.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/PropertyUtil.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/PropertyUtil.java
index 83d370a..900259b 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/PropertyUtil.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/PropertyUtil.java
@@ -27,7 +27,6 @@ import java.net.URL;
import java.net.URLDecoder;
import java.net.URLEncoder;
import java.util.Collections;
-import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
@@ -201,7 +200,7 @@ public class PropertyUtil {
*/
public static Map<String, String> parseQuery(String queryString) throws Exception {
if (queryString != null && !queryString.isEmpty()) {
- Map<String, String> rc = new HashMap<String, String>();
+ Map<String, String> rc = new LinkedHashMap<String, String>();
String[] parameters = queryString.split("&");
for (int i = 0; i < parameters.length; i++) {
int p = parameters[i].indexOf("=");
@@ -236,7 +235,7 @@ public class PropertyUtil {
throw new IllegalArgumentException("The given properties object was null.");
}
- HashMap<String, String> rc = new HashMap<String, String>(properties.size());
+ Map<String, String> rc = new LinkedHashMap<String, String>(properties.size());
for (Iterator<Entry<String, String>> iter = properties.entrySet().iterator(); iter.hasNext();) {
Entry<String, String> entry = iter.next();
@@ -269,7 +268,7 @@ public class PropertyUtil {
throw new IllegalArgumentException("Given Properties object cannot be null");
}
- Map<String, String> unmatched = new HashMap<String, String>();
+ Map<String, String> unmatched = new LinkedHashMap<String, String>();
for (Map.Entry<String, String> entry : properties.entrySet()) {
if (!setProperty(target, entry.getKey(), entry.getValue())) {
@@ -299,7 +298,7 @@ public class PropertyUtil {
throw new IllegalArgumentException("Given Properties object cannot be null");
}
- Map<String, Object> unmatched = new HashMap<String, Object>();
+ Map<String, Object> unmatched = new LinkedHashMap<String, Object>();
for (Map.Entry<Object, Object> entry : properties.entrySet()) {
if (!setProperty(target, (String) entry.getKey(), entry.getValue())) {
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/6295f7e6/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/URISupport.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/URISupport.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/URISupport.java
index 2dc54f4..6fb9b97 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/URISupport.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/URISupport.java
@@ -21,6 +21,7 @@ import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@@ -405,8 +406,11 @@ public class URISupport {
/**
* Given a Key / Value mapping create and append a URI query value that represents the
* mapped entries, return the newly updated URI that contains the value of the given URI and
- * the appended query value. Each entry in the query string is prefixed by the supplied
- * optionPrefix string.
+ * the appended query value. Only values in the given options map that start with the provided
+ * prefix are appended to the provided URI, the prefix is stripped off before the insertion.
+ * <P>
+ * This method replaces the value of any matching query string options in the original URI with
+ * the value given in the provided query parameters map.
*
* @param uri
* The source URI that will have the Map entries appended as a URI query value.
@@ -422,18 +426,20 @@ public class URISupport {
*/
public static URI applyParameters(URI uri, Map<String, String> queryParameters, String optionPrefix) throws URISyntaxException {
if (queryParameters != null && !queryParameters.isEmpty()) {
- StringBuffer newQuery = uri.getRawQuery() != null ? new StringBuffer(uri.getRawQuery()) : new StringBuffer();
+
+ Map<String, String> currentParameters = new LinkedHashMap<String, String>(parseParameters(uri));
+
+ // Replace any old values with the new value from the provided map.
for (Map.Entry<String, String> param : queryParameters.entrySet()) {
if (param.getKey().startsWith(optionPrefix)) {
- if (newQuery.length() != 0) {
- newQuery.append('&');
- }
final String key = param.getKey().substring(optionPrefix.length());
- newQuery.append(key).append('=').append(param.getValue());
+ currentParameters.put(key, param.getValue());
}
}
- uri = PropertyUtil.replaceQuery(uri, newQuery.toString());
+
+ uri = PropertyUtil.replaceQuery(uri, currentParameters);
}
+
return uri;
}
}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/6295f7e6/qpid-jms-client/src/main/resources/META-INF/services/org/apache/qpid/jms/provider/amqp
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/resources/META-INF/services/org/apache/qpid/jms/provider/amqp b/qpid-jms-client/src/main/resources/META-INF/services/org/apache/qpid/jms/provider/amqp
index 56741dc..e6558d0 100644
--- a/qpid-jms-client/src/main/resources/META-INF/services/org/apache/qpid/jms/provider/amqp
+++ b/qpid-jms-client/src/main/resources/META-INF/services/org/apache/qpid/jms/provider/amqp
@@ -15,4 +15,5 @@
## limitations under the License.
## ---------------------------------------------------------------------------
class=org.apache.qpid.jms.provider.amqp.AmqpProviderFactory
-transportType=tcp
\ No newline at end of file
+transportScheme=tcp
+providerScheme=amqp
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/6295f7e6/qpid-jms-client/src/main/resources/META-INF/services/org/apache/qpid/jms/provider/amqps
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/resources/META-INF/services/org/apache/qpid/jms/provider/amqps b/qpid-jms-client/src/main/resources/META-INF/services/org/apache/qpid/jms/provider/amqps
index 414957b..1a5b61f 100644
--- a/qpid-jms-client/src/main/resources/META-INF/services/org/apache/qpid/jms/provider/amqps
+++ b/qpid-jms-client/src/main/resources/META-INF/services/org/apache/qpid/jms/provider/amqps
@@ -15,4 +15,5 @@
## limitations under the License.
## ---------------------------------------------------------------------------
class=org.apache.qpid.jms.provider.amqp.AmqpProviderFactory
-transportType=ssl
\ No newline at end of file
+transportScheme=ssl
+providerScheme=amqps
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/6295f7e6/qpid-jms-client/src/main/resources/META-INF/services/org/apache/qpid/jms/provider/amqpws
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/resources/META-INF/services/org/apache/qpid/jms/provider/amqpws b/qpid-jms-client/src/main/resources/META-INF/services/org/apache/qpid/jms/provider/amqpws
index 9cd4d1c..87fa753 100644
--- a/qpid-jms-client/src/main/resources/META-INF/services/org/apache/qpid/jms/provider/amqpws
+++ b/qpid-jms-client/src/main/resources/META-INF/services/org/apache/qpid/jms/provider/amqpws
@@ -15,4 +15,5 @@
## limitations under the License.
## ---------------------------------------------------------------------------
class=org.apache.qpid.jms.provider.amqp.AmqpProviderFactory
-transportType=ws
\ No newline at end of file
+transportScheme=ws
+providerScheme=amqpws
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/6295f7e6/qpid-jms-client/src/main/resources/META-INF/services/org/apache/qpid/jms/provider/amqpwss
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/resources/META-INF/services/org/apache/qpid/jms/provider/amqpwss b/qpid-jms-client/src/main/resources/META-INF/services/org/apache/qpid/jms/provider/amqpwss
index 4c2b8c9..705d2e2 100644
--- a/qpid-jms-client/src/main/resources/META-INF/services/org/apache/qpid/jms/provider/amqpwss
+++ b/qpid-jms-client/src/main/resources/META-INF/services/org/apache/qpid/jms/provider/amqpwss
@@ -15,4 +15,5 @@
## limitations under the License.
## ---------------------------------------------------------------------------
class=org.apache.qpid.jms.provider.amqp.AmqpProviderFactory
-transportType=wss
\ No newline at end of file
+transportScheme=wss
+providerScheme=amqpwss
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/6295f7e6/qpid-jms-client/src/main/resources/META-INF/services/org/apache/qpid/jms/provider/redirects/ws
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/resources/META-INF/services/org/apache/qpid/jms/provider/redirects/ws b/qpid-jms-client/src/main/resources/META-INF/services/org/apache/qpid/jms/provider/redirects/ws
new file mode 100644
index 0000000..87fa753
--- /dev/null
+++ b/qpid-jms-client/src/main/resources/META-INF/services/org/apache/qpid/jms/provider/redirects/ws
@@ -0,0 +1,19 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements. See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License. You may obtain a copy of the License at
+##
+## http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+class=org.apache.qpid.jms.provider.amqp.AmqpProviderFactory
+transportScheme=ws
+providerScheme=amqpws
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/6295f7e6/qpid-jms-client/src/main/resources/META-INF/services/org/apache/qpid/jms/provider/redirects/wss
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/resources/META-INF/services/org/apache/qpid/jms/provider/redirects/wss b/qpid-jms-client/src/main/resources/META-INF/services/org/apache/qpid/jms/provider/redirects/wss
new file mode 100644
index 0000000..705d2e2
--- /dev/null
+++ b/qpid-jms-client/src/main/resources/META-INF/services/org/apache/qpid/jms/provider/redirects/wss
@@ -0,0 +1,19 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements. See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License. You may obtain a copy of the License at
+##
+## http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+class=org.apache.qpid.jms.provider.amqp.AmqpProviderFactory
+transportScheme=wss
+providerScheme=amqpwss
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/6295f7e6/qpid-jms-client/src/test/java/org/apache/qpid/jms/JmsDefaultConnectionListener.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/JmsDefaultConnectionListener.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/JmsDefaultConnectionListener.java
index c1b8b9e..b152894 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/JmsDefaultConnectionListener.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/JmsDefaultConnectionListener.java
@@ -19,6 +19,7 @@
package org.apache.qpid.jms;
import java.net.URI;
+import java.util.List;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
@@ -59,4 +60,8 @@ public class JmsDefaultConnectionListener implements JmsConnectionListener {
@Override
public void onProducerClosed(MessageProducer producer, Throwable cause) {
}
+
+ @Override
+ public void onRemoteDiscovery(List<URI> remotes) {
+ }
}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/6295f7e6/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionIntegrationTest.java
index f66ca6d..e5b752c 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionIntegrationTest.java
@@ -457,9 +457,12 @@ public class ConnectionIntegrationTest extends QpidJmsTestCase {
assertTrue(asyncError.get().getCause() instanceof ProviderRedirectedException);
ProviderRedirectedException redirect = (ProviderRedirectedException) asyncError.get().getCause();
- assertEquals(redirectVhost, redirect.getHostname());
- assertEquals(redirectNetworkHost, redirect.getNetworkHost());
- assertEquals(redirectPort, redirect.getPort());
+ URI redirectionURI = redirect.getRedirectionURI();
+
+ assertNotNull(redirectionURI);
+ assertTrue(redirectVhost, redirectionURI.getQuery().contains("amqp.vhost=" + redirectVhost));
+ assertEquals(redirectNetworkHost, redirectionURI.getHost());
+ assertEquals(redirectPort, redirectionURI.getPort());
testPeer.waitForAllHandlersToComplete(1000);
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/6295f7e6/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/FailedConnectionsIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/FailedConnectionsIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/FailedConnectionsIntegrationTest.java
index 1292352..c501cfa 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/FailedConnectionsIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/FailedConnectionsIntegrationTest.java
@@ -20,10 +20,12 @@ import static org.apache.qpid.jms.provider.amqp.AmqpSupport.NETWORK_HOST;
import static org.apache.qpid.jms.provider.amqp.AmqpSupport.OPEN_HOSTNAME;
import static org.apache.qpid.jms.provider.amqp.AmqpSupport.PORT;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
+import java.net.URI;
import java.util.HashMap;
import java.util.Map;
@@ -171,9 +173,13 @@ public class FailedConnectionsIntegrationTest extends QpidJmsTestCase {
} catch (JMSException jmsex) {
assertTrue(jmsex.getCause() instanceof ProviderRedirectedException);
ProviderRedirectedException redirectEx = (ProviderRedirectedException) jmsex.getCause();
- assertEquals("vhost", redirectEx.getHostname());
- assertEquals("127.0.0.1", redirectEx.getNetworkHost());
- assertEquals(5672, redirectEx.getPort());
+
+ URI redirectionURI = redirectEx.getRedirectionURI();
+
+ assertNotNull(redirectionURI);
+ assertTrue("vhost", redirectionURI.getQuery().contains("amqp.vhost=vhost"));
+ assertEquals("127.0.0.1", redirectionURI.getHost());
+ assertEquals(5672, redirectionURI.getPort());
} catch (Exception ex) {
fail("Should have thrown JMSException: " + ex);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org