You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ma...@apache.org on 2017/06/19 11:44:31 UTC
[1/2] activemq-artemis git commit: This closes #1337
Repository: activemq-artemis
Updated Branches:
refs/heads/master 8dd6b712f -> 7610dadaf
This closes #1337
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/7610dada
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/7610dada
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/7610dada
Branch: refs/heads/master
Commit: 7610dadaf36d4fb05cdf35f900c840ef5496a173
Parents: 8dd6b71 44b7e45
Author: Martyn Taylor <mt...@redhat.com>
Authored: Mon Jun 19 12:44:13 2017 +0100
Committer: Martyn Taylor <mt...@redhat.com>
Committed: Mon Jun 19 12:44:13 2017 +0100
----------------------------------------------------------------------
.../config/ActiveMQDefaultConfiguration.java | 6 ++
.../artemis/jms/client/ActiveMQDestination.java | 6 +-
.../artemis/jms/client/ActiveMQSession.java | 6 +-
.../amqp/broker/ProtonProtocolManager.java | 3 +-
.../client/AMQPClientConnectionFactory.java | 4 +-
.../amqp/proton/AMQPConnectionContext.java | 8 ++
.../amqp/proton/ProtonServerSenderContext.java | 52 +++++++++---
.../protocol/openwire/OpenWireConnection.java | 2 +-
.../core/protocol/openwire/amq/AMQConsumer.java | 2 +-
.../ra/inflow/ActiveMQMessageHandler.java | 2 +-
.../artemis/core/config/Configuration.java | 10 +++
.../core/config/impl/ConfigurationImpl.java | 14 ++++
.../deployers/impl/FileConfigurationParser.java | 6 ++
.../resources/schema/artemis-configuration.xsd | 10 +++
.../impl/DefaultsFileConfigurationTest.java | 2 +
.../core/config/impl/FileConfigurationTest.java | 1 +
.../resources/ConfigurationTest-full-config.xml | 1 +
.../amqp/ClientDefinedMultiConsumerTest.java | 32 ++++----
.../integration/amqp/JMSClientTestSupport.java | 52 ++++++++++++
.../amqp/JMSDurableConsumerTest.java | 22 ++++++
.../integration/amqp/JMSSharedConsumerTest.java | 79 ++++++-------------
.../amqp/JMSSharedDurableConsumerTest.java | 83 +++++++-------------
22 files changed, 255 insertions(+), 148 deletions(-)
----------------------------------------------------------------------
[2/2] activemq-artemis git commit: ARTEMIS-1205: AMQP Shared Durable
Subscriber incorrect behaviour
Posted by ma...@apache.org.
ARTEMIS-1205: AMQP Shared Durable Subscriber incorrect behaviour
Use AcitveMQDestination for subscription naming, fixing and aligning queue naming in the process.
The change is behind a configuration toggle so to avoid causing any breaking changes for uses not expecting.
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/44b7e455
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/44b7e455
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/44b7e455
Branch: refs/heads/master
Commit: 44b7e455cb93ca0151793ee24de40b0a7a58301b
Parents: 8dd6b71
Author: Michael Andre Pearce <Mi...@me.com>
Authored: Tue Jun 13 09:23:33 2017 +0100
Committer: Martyn Taylor <mt...@redhat.com>
Committed: Mon Jun 19 12:44:13 2017 +0100
----------------------------------------------------------------------
.../config/ActiveMQDefaultConfiguration.java | 6 ++
.../artemis/jms/client/ActiveMQDestination.java | 6 +-
.../artemis/jms/client/ActiveMQSession.java | 6 +-
.../amqp/broker/ProtonProtocolManager.java | 3 +-
.../client/AMQPClientConnectionFactory.java | 4 +-
.../amqp/proton/AMQPConnectionContext.java | 8 ++
.../amqp/proton/ProtonServerSenderContext.java | 52 +++++++++---
.../protocol/openwire/OpenWireConnection.java | 2 +-
.../core/protocol/openwire/amq/AMQConsumer.java | 2 +-
.../ra/inflow/ActiveMQMessageHandler.java | 2 +-
.../artemis/core/config/Configuration.java | 10 +++
.../core/config/impl/ConfigurationImpl.java | 14 ++++
.../deployers/impl/FileConfigurationParser.java | 6 ++
.../resources/schema/artemis-configuration.xsd | 10 +++
.../impl/DefaultsFileConfigurationTest.java | 2 +
.../core/config/impl/FileConfigurationTest.java | 1 +
.../resources/ConfigurationTest-full-config.xml | 1 +
.../amqp/ClientDefinedMultiConsumerTest.java | 32 ++++----
.../integration/amqp/JMSClientTestSupport.java | 52 ++++++++++++
.../amqp/JMSDurableConsumerTest.java | 22 ++++++
.../integration/amqp/JMSSharedConsumerTest.java | 79 ++++++-------------
.../amqp/JMSSharedDurableConsumerTest.java | 83 +++++++-------------
22 files changed, 255 insertions(+), 148 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/44b7e455/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
index 6bd0ae8..7ace35c 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
@@ -442,6 +442,8 @@ public final class ActiveMQDefaultConfiguration {
// Default period to wait between configuration file checks
public static final long DEFAULT_CONFIGURATION_FILE_REFRESH_PERIOD = 5000;
+ public static final boolean DEFAULT_AMQP_USE_CORE_SUBSCRIPTION_NAMING = false;
+
public static final long DEFAULT_GLOBAL_MAX_SIZE = Runtime.getRuntime().maxMemory() / 2;
public static final int DEFAULT_MAX_DISK_USAGE = 100;
@@ -1207,6 +1209,10 @@ public final class ActiveMQDefaultConfiguration {
return DEFAULT_CONFIGURATION_FILE_REFRESH_PERIOD;
}
+ public static boolean getDefaultAmqpUseCoreSubscriptionNaming() {
+ return DEFAULT_AMQP_USE_CORE_SUBSCRIPTION_NAMING;
+ }
+
/**
* The default global max size. -1 = no global max size.
*/
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/44b7e455/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQDestination.java
----------------------------------------------------------------------
diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQDestination.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQDestination.java
index 297efe8..0bf4dd6 100644
--- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQDestination.java
+++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQDestination.java
@@ -102,9 +102,9 @@ public class ActiveMQDestination implements Destination, Serializable, Reference
}
}
- public static String createQueueNameForDurableSubscription(final boolean isDurable,
- final String clientID,
- final String subscriptionName) {
+ public static String createQueueNameForSubscription(final boolean isDurable,
+ final String clientID,
+ final String subscriptionName) {
if (clientID != null) {
if (isDurable) {
return ActiveMQDestination.escape(clientID) + SEPARATOR +
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/44b7e455/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQSession.java
----------------------------------------------------------------------
diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQSession.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQSession.java
index a8aceec..374a985 100644
--- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQSession.java
+++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQSession.java
@@ -627,7 +627,7 @@ public class ActiveMQSession implements QueueSession, TopicSession {
throw new InvalidDestinationException("Cannot create a durable subscription on a temporary topic");
}
- queueName = new SimpleString(ActiveMQDestination.createQueueNameForDurableSubscription(durability == ConsumerDurability.DURABLE, connection.getClientID(), subscriptionName));
+ queueName = new SimpleString(ActiveMQDestination.createQueueNameForSubscription(durability == ConsumerDurability.DURABLE, connection.getClientID(), subscriptionName));
if (durability == ConsumerDurability.DURABLE) {
try {
@@ -750,7 +750,7 @@ public class ActiveMQSession implements QueueSession, TopicSession {
throw new InvalidDestinationException("Cannot create a durable subscription on a temporary topic");
}
- queueName = new SimpleString(ActiveMQDestination.createQueueNameForDurableSubscription(true, connection.getClientID(), subscriptionName));
+ queueName = new SimpleString(ActiveMQDestination.createQueueNameForSubscription(true, connection.getClientID(), subscriptionName));
QueueQuery subResponse = session.queueQuery(queueName);
@@ -918,7 +918,7 @@ public class ActiveMQSession implements QueueSession, TopicSession {
throw new IllegalStateException("Cannot unsubscribe using a QueueSession");
}
- SimpleString queueName = new SimpleString(ActiveMQDestination.createQueueNameForDurableSubscription(true, connection.getClientID(), name));
+ SimpleString queueName = new SimpleString(ActiveMQDestination.createQueueNameForSubscription(true, connection.getClientID(), name));
try {
QueueQuery response = session.queueQuery(queueName);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/44b7e455/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java
index b5325fb..d36f18e 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java
@@ -116,7 +116,8 @@ public class ProtonProtocolManager extends AbstractProtocolManager<AMQPMessage,
}
String id = server.getConfiguration().getName();
- AMQPConnectionContext amqpConnection = new AMQPConnectionContext(this, connectionCallback, id, (int) ttl, getMaxFrameSize(), AMQPConstants.Connection.DEFAULT_CHANNEL_MAX, server.getScheduledPool());
+ boolean useCoreSubscriptionNaming = server.getConfiguration().isAmqpUseCoreSubscriptionNaming();
+ AMQPConnectionContext amqpConnection = new AMQPConnectionContext(this, connectionCallback, id, (int) ttl, getMaxFrameSize(), AMQPConstants.Connection.DEFAULT_CHANNEL_MAX, useCoreSubscriptionNaming, server.getScheduledPool());
Executor executor = server.getExecutorFactory().getExecutor();
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/44b7e455/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/client/AMQPClientConnectionFactory.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/client/AMQPClientConnectionFactory.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/client/AMQPClientConnectionFactory.java
index 6aa8fda..4e532bb 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/client/AMQPClientConnectionFactory.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/client/AMQPClientConnectionFactory.java
@@ -39,12 +39,14 @@ public class AMQPClientConnectionFactory {
private final String containerId;
private final Map<Symbol, Object> connectionProperties;
private final int ttl;
+ private final boolean useCoreSubscriptionNaming;
public AMQPClientConnectionFactory(ActiveMQServer server, String containerId, Map<Symbol, Object> connectionProperties, int ttl) {
this.server = server;
this.containerId = containerId;
this.connectionProperties = connectionProperties;
this.ttl = ttl;
+ this.useCoreSubscriptionNaming = false;
}
public ActiveMQProtonRemotingConnection createConnection(ProtonProtocolManager protocolManager, Connection connection, Optional<EventHandler> eventHandler) {
@@ -52,7 +54,7 @@ public class AMQPClientConnectionFactory {
Executor executor = server.getExecutorFactory().getExecutor();
- AMQPConnectionContext amqpConnection = new AMQPConnectionContext(protocolManager, connectionCallback, containerId, ttl, protocolManager.getMaxFrameSize(), AMQPConstants.Connection.DEFAULT_CHANNEL_MAX, server.getScheduledPool());
+ AMQPConnectionContext amqpConnection = new AMQPConnectionContext(protocolManager, connectionCallback, containerId, ttl, protocolManager.getMaxFrameSize(), AMQPConstants.Connection.DEFAULT_CHANNEL_MAX, useCoreSubscriptionNaming, server.getScheduledPool());
eventHandler.ifPresent(amqpConnection::addEventHandler);
ActiveMQProtonRemotingConnection delegate = new ActiveMQProtonRemotingConnection(protocolManager, amqpConnection, connection, executor);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/44b7e455/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java
index 4a46a8a..0ab4171 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java
@@ -74,16 +74,20 @@ public class AMQPConnectionContext extends ProtonInitializable implements EventH
private final ProtonProtocolManager protocolManager;
+ private final boolean useCoreSubscriptionNaming;
+
public AMQPConnectionContext(ProtonProtocolManager protocolManager,
AMQPConnectionCallback connectionSP,
String containerId,
int idleTimeout,
int maxFrameSize,
int channelMax,
+ boolean useCoreSubscriptionNaming,
ScheduledExecutorService scheduledPool) {
this.protocolManager = protocolManager;
this.connectionCallback = connectionSP;
+ this.useCoreSubscriptionNaming = useCoreSubscriptionNaming;
this.containerId = (containerId != null) ? containerId : UUID.randomUUID().toString();
connectionProperties.put(AmqpSupport.PRODUCT, "apache-activemq-artemis");
@@ -260,6 +264,10 @@ public class AMQPConnectionContext extends ProtonInitializable implements EventH
}
}
+ public boolean isUseCoreSubscriptionNaming() {
+ return useCoreSubscriptionNaming;
+ }
+
@Override
public void onInit(Connection connection) throws Exception {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/44b7e455/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
index ad8bee7..8f8222b 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
@@ -34,6 +34,7 @@ import org.apache.activemq.artemis.core.server.AddressQueryResult;
import org.apache.activemq.artemis.core.server.Consumer;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.QueueQueryResult;
+import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback;
import org.apache.activemq.artemis.protocol.amqp.broker.ActiveMQProtonRemotingConnection;
@@ -68,6 +69,7 @@ import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.EndpointState;
+import org.apache.qpid.proton.engine.Link;
import org.apache.qpid.proton.engine.Sender;
import org.jboss.logging.Logger;
@@ -188,7 +190,8 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
// subscription queue
String clientId = getClientId();
String pubId = sender.getName();
- queue = createQueueName(clientId, pubId, true, global, false);
+ global = hasRemoteDesiredCapability(sender, GLOBAL);
+ queue = createQueueName(connection.isUseCoreSubscriptionNaming(), clientId, pubId, true, global, false);
QueueQueryResult result = sessionSPI.queueQuery(queue, RoutingType.MULTICAST, false);
multicast = true;
routingTypeToUse = RoutingType.MULTICAST;
@@ -343,7 +346,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
// id and link name
String clientId = getClientId();
String pubId = sender.getName();
- queue = createQueueName(clientId, pubId, shared, global, false);
+ queue = createQueueName(connection.isUseCoreSubscriptionNaming(), clientId, pubId, shared, global, false);
QueueQueryResult result = sessionSPI.queueQuery(queue, routingTypeToUse, false);
if (result.isExists()) {
@@ -369,7 +372,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
// otherwise we are a volatile subscription
isVolatile = true;
if (shared && sender.getName() != null) {
- queue = createQueueName(getClientId(), sender.getName(), shared, global, isVolatile);
+ queue = createQueueName(connection.isUseCoreSubscriptionNaming(), getClientId(), sender.getName(), shared, global, isVolatile);
try {
sessionSPI.createSharedVolatileQueue(source.getAddress(), RoutingType.MULTICAST, queue, selector);
} catch (ActiveMQQueueExistsException e) {
@@ -493,7 +496,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
if (pubId.contains("|")) {
pubId = pubId.split("\\|")[0];
}
- String queue = createQueueName(clientId, pubId, shared, global, isVolatile);
+ String queue = createQueueName(connection.isUseCoreSubscriptionNaming(), clientId, pubId, shared, global, isVolatile);
result = sessionSPI.queueQuery(queue, multicast ? RoutingType.MULTICAST : RoutingType.ANYCAST, false);
//only delete if it isn't volatile and has no consumers
if (result.isExists() && !isVolatile && result.getConsumerCount() == 0) {
@@ -733,20 +736,43 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr
return false;
}
- private static String createQueueName(String clientId,
+ private static boolean hasRemoteDesiredCapability(Link link, Symbol capability) {
+ Symbol[] remoteDesiredCapabilities = link.getRemoteDesiredCapabilities();
+ if (remoteDesiredCapabilities != null) {
+ for (Symbol cap : remoteDesiredCapabilities) {
+ if (capability.equals(cap)) {
+ return true;
+ }
+ }
+ }
+ return false;
+ }
+
+ private static String createQueueName(boolean useCoreSubscriptionNaming,
+ String clientId,
String pubId,
boolean shared,
boolean global,
boolean isVolatile) {
- String queue = clientId == null || clientId.isEmpty() || global ? pubId : clientId + "." + pubId;
- if (shared) {
- if (queue.contains("|")) {
- queue = queue.split("\\|")[0];
- }
- if (isVolatile) {
- queue = "nonDurable" + "." + queue;
+ if (useCoreSubscriptionNaming) {
+ final boolean durable = !isVolatile;
+ final String subscriptionName = pubId.contains("|") ? pubId.split("\\|")[0] : pubId;
+ final String clientID = clientId == null || clientId.isEmpty() || global ? null : clientId;
+ return ActiveMQDestination.createQueueNameForSubscription(durable, clientID, subscriptionName);
+ } else {
+ String queue = clientId == null || clientId.isEmpty() || global ? pubId : clientId + "." + pubId;
+ if (shared) {
+ if (queue.contains("|")) {
+ queue = queue.split("\\|")[0];
+ }
+ if (isVolatile) {
+ queue += ":shared-volatile";
+ }
+ if (global) {
+ queue += ":global";
+ }
}
+ return queue;
}
- return queue;
}
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/44b7e455/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
index c63d266..a56901e 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
@@ -1093,7 +1093,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
@Override
public Response processRemoveSubscription(RemoveSubscriptionInfo subInfo) throws Exception {
- SimpleString subQueueName = new SimpleString(org.apache.activemq.artemis.jms.client.ActiveMQDestination.createQueueNameForDurableSubscription(true, subInfo.getClientId(), subInfo.getSubscriptionName()));
+ SimpleString subQueueName = new SimpleString(org.apache.activemq.artemis.jms.client.ActiveMQDestination.createQueueNameForSubscription(true, subInfo.getClientId(), subInfo.getSubscriptionName()));
server.destroyQueue(subQueueName);
return null;
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/44b7e455/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
index 3bdee8b..6bba4e1 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
@@ -150,7 +150,7 @@ public class AMQConsumer {
addressInfo.addRoutingType(RoutingType.MULTICAST);
}
if (isDurable) {
- queueName = new SimpleString(org.apache.activemq.artemis.jms.client.ActiveMQDestination.createQueueNameForDurableSubscription(true, clientID, subscriptionName));
+ queueName = new SimpleString(org.apache.activemq.artemis.jms.client.ActiveMQDestination.createQueueNameForSubscription(true, clientID, subscriptionName));
QueueQueryResult result = session.getCoreSession().executeQueueQuery(queueName);
if (result.isExists()) {
// Already exists
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/44b7e455/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/inflow/ActiveMQMessageHandler.java
----------------------------------------------------------------------
diff --git a/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/inflow/ActiveMQMessageHandler.java b/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/inflow/ActiveMQMessageHandler.java
index 4ff6357..bb2f870 100644
--- a/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/inflow/ActiveMQMessageHandler.java
+++ b/artemis-ra/src/main/java/org/apache/activemq/artemis/ra/inflow/ActiveMQMessageHandler.java
@@ -112,7 +112,7 @@ public class ActiveMQMessageHandler implements MessageHandler, FailoverEventList
// Create the message consumer
SimpleString selectorString = selector == null || selector.trim().equals("") ? null : new SimpleString(selector);
if (activation.isTopic() && spec.isSubscriptionDurable()) {
- SimpleString queueName = new SimpleString(ActiveMQDestination.createQueueNameForDurableSubscription(true, spec.getClientID(), spec.getSubscriptionName()));
+ SimpleString queueName = new SimpleString(ActiveMQDestination.createQueueNameForSubscription(true, spec.getClientID(), spec.getSubscriptionName()));
QueueQuery subResponse = session.queueQuery(queueName);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/44b7e455/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java
index aad3968..9d5d4a8 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/Configuration.java
@@ -290,6 +290,16 @@ public interface Configuration {
Configuration setConnectionTTLOverride(long ttl);
/**
+ * Returns if to use Core subscription naming for AMQP.
+ */
+ boolean isAmqpUseCoreSubscriptionNaming();
+
+ /**
+ * Sets if to use Core subscription naming for AMQP.
+ */
+ Configuration setAmqpUseCoreSubscriptionNaming(boolean amqpUseCoreSubscriptionNaming);
+
+ /**
* Returns whether code coming from connection is executed asynchronously or not. <br>
* Default value is
* {@link org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration#DEFAULT_ASYNC_CONNECTION_EXECUTION_ENABLED}.
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/44b7e455/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java
index 8edeb5b..fb678b6 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/impl/ConfigurationImpl.java
@@ -262,6 +262,8 @@ public class ConfigurationImpl implements Configuration, Serializable {
private Long globalMaxSize;
+ private boolean amqpUseCoreSubscriptionNaming = ActiveMQDefaultConfiguration.getDefaultAmqpUseCoreSubscriptionNaming();
+
private int maxDiskUsage = ActiveMQDefaultConfiguration.getDefaultMaxDiskUsage();
private int diskScanPeriod = ActiveMQDefaultConfiguration.getDefaultDiskScanPeriod();
@@ -454,6 +456,18 @@ public class ConfigurationImpl implements Configuration, Serializable {
}
@Override
+ public boolean isAmqpUseCoreSubscriptionNaming() {
+ return amqpUseCoreSubscriptionNaming;
+ }
+
+ @Override
+ public Configuration setAmqpUseCoreSubscriptionNaming(boolean amqpUseCoreSubscriptionNaming) {
+ this.amqpUseCoreSubscriptionNaming = amqpUseCoreSubscriptionNaming;
+ return this;
+ }
+
+
+ @Override
public boolean isAsyncConnectionExecutionEnabled() {
return asyncConnectionExecutionEnabled;
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/44b7e455/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
index 5243325..a3d59f6 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
@@ -219,6 +219,9 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
private static final String INTERNAL_NAMING_PREFIX = "internal-naming-prefix";
+ private static final String AMQP_USE_CORE_SUBSCRIPTION_NAMING = "amqp-use-core-subscription-naming";
+
+
// Attributes ----------------------------------------------------
private boolean validateAIO = false;
@@ -342,6 +345,9 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
config.setInternalNamingPrefix(getString(e, INTERNAL_NAMING_PREFIX, config.getInternalNamingPrefix(), Validators.NO_CHECK));
+ config.setAmqpUseCoreSubscriptionNaming(getBoolean(e, AMQP_USE_CORE_SUBSCRIPTION_NAMING, config.isAmqpUseCoreSubscriptionNaming()));
+
+
// parsing cluster password
String passwordText = getString(e, "cluster-password", null, Validators.NO_CHECK);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/44b7e455/artemis-server/src/main/resources/schema/artemis-configuration.xsd
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/resources/schema/artemis-configuration.xsd b/artemis-server/src/main/resources/schema/artemis-configuration.xsd
index b0e8502..6af18bd 100644
--- a/artemis-server/src/main/resources/schema/artemis-configuration.xsd
+++ b/artemis-server/src/main/resources/schema/artemis-configuration.xsd
@@ -52,6 +52,16 @@
</xsd:annotation>
</xsd:element>
+ <xsd:element name="amqp-use-core-subscription-naming" type="xsd:boolean" maxOccurs="1" minOccurs="0" default="false">
+ <xsd:annotation>
+ <xsd:documentation>
+ This enables making AMQP subscription queue names, match core queue names, for better interoperability between protocols.
+ Note: Enabling this to an existing broker if pre-existing amqp durable subscriptions already existed will require
+ clients to re-subscribe and to clean up old subscription names.
+ </xsd:documentation>
+ </xsd:annotation>
+ </xsd:element>
+
<xsd:element name="resolve-protocols" type="xsd:boolean" default="true" maxOccurs="1"
minOccurs="0">
<xsd:annotation>
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/44b7e455/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/DefaultsFileConfigurationTest.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/DefaultsFileConfigurationTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/DefaultsFileConfigurationTest.java
index 07d5f58..a07c797 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/DefaultsFileConfigurationTest.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/DefaultsFileConfigurationTest.java
@@ -133,6 +133,8 @@ public class DefaultsFileConfigurationTest extends ConfigurationImplTest {
Assert.assertEquals(ActiveMQDefaultConfiguration.isDefaultGracefulShutdownEnabled(), conf.isGracefulShutdownEnabled());
Assert.assertEquals(ActiveMQDefaultConfiguration.getDefaultGracefulShutdownTimeout(), conf.getGracefulShutdownTimeout());
+
+ Assert.assertEquals(ActiveMQDefaultConfiguration.getDefaultAmqpUseCoreSubscriptionNaming(), conf.isAmqpUseCoreSubscriptionNaming());
}
// Protected ---------------------------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/44b7e455/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java
index 2f18365..28ad83e 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java
@@ -92,6 +92,7 @@ public class FileConfigurationTest extends ConfigurationImplTest {
Assert.assertEquals("pagingdir", conf.getPagingDirectory());
Assert.assertEquals("somedir", conf.getBindingsDirectory());
Assert.assertEquals(false, conf.isCreateBindingsDir());
+ Assert.assertEquals(true, conf.isAmqpUseCoreSubscriptionNaming());
Assert.assertEquals("max concurrent io", 17, conf.getPageMaxConcurrentIO());
Assert.assertEquals("somedir2", conf.getJournalDirectory());
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/44b7e455/artemis-server/src/test/resources/ConfigurationTest-full-config.xml
----------------------------------------------------------------------
diff --git a/artemis-server/src/test/resources/ConfigurationTest-full-config.xml b/artemis-server/src/test/resources/ConfigurationTest-full-config.xml
index 0691e95..ca10eb9 100644
--- a/artemis-server/src/test/resources/ConfigurationTest-full-config.xml
+++ b/artemis-server/src/test/resources/ConfigurationTest-full-config.xml
@@ -122,6 +122,7 @@
<exclusive>false</exclusive>
</divert>
</diverts>
+ <amqp-use-core-subscription-naming>true</amqp-use-core-subscription-naming>
<queues>
<queue name="queue1">
<address>address1</address>
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/44b7e455/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ClientDefinedMultiConsumerTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ClientDefinedMultiConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ClientDefinedMultiConsumerTest.java
index 3f8da1a..51c70ee 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ClientDefinedMultiConsumerTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/ClientDefinedMultiConsumerTest.java
@@ -57,15 +57,15 @@ public class ClientDefinedMultiConsumerTest extends AmqpClientTestSupport {
assertNotNull(amqpMessage);
amqpMessage = receiver2.receive(5, TimeUnit.SECONDS);
assertNotNull(amqpMessage);
- assertEquals(2, ((QueueImpl)server.getPostOffice().getBinding(SimpleString.toSimpleString("nonDurable.myClientId.mySub")).getBindable()).getConsumerCount());
+ assertEquals(2, ((QueueImpl)server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub:shared-volatile")).getBindable()).getConsumerCount());
receiver.close();
- assertNotNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("nonDurable.myClientId.mySub")));
+ assertNotNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub:shared-volatile")));
receiver2.close();
//check its been deleted
Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisfied() throws Exception {
- return server.getPostOffice().getBinding(SimpleString.toSimpleString("nonDurable.myClientId.mySub")) == null;
+ return server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub:shared-volatile")) == null;
}
}, 1000);
connection.close();
@@ -76,7 +76,7 @@ public class ClientDefinedMultiConsumerTest extends AmqpClientTestSupport {
AddressInfo addressInfo = new AddressInfo(address);
addressInfo.getRoutingTypes().add(RoutingType.MULTICAST);
server.addAddressInfo(addressInfo);
- server.createQueue(address, RoutingType.MULTICAST, SimpleString.toSimpleString("nonDurable.myClientId.mySub"), null, true, false, -1, false, false);
+ server.createQueue(address, RoutingType.MULTICAST, SimpleString.toSimpleString("myClientId.mySub:shared-volatile"), null, true, false, -1, false, false);
AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect("myClientId"));
@@ -91,12 +91,12 @@ public class ClientDefinedMultiConsumerTest extends AmqpClientTestSupport {
assertNotNull(amqpMessage);
amqpMessage = receiver2.receive(5, TimeUnit.SECONDS);
assertNotNull(amqpMessage);
- assertEquals(2, ((QueueImpl)server.getPostOffice().getBinding(SimpleString.toSimpleString("nonDurable.myClientId.mySub")).getBindable()).getConsumerCount());
+ assertEquals(2, ((QueueImpl)server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub:shared-volatile")).getBindable()).getConsumerCount());
receiver.close();
- assertNotNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("nonDurable.myClientId.mySub")));
+ assertNotNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub:shared-volatile")));
receiver2.close();
//check its **Hasn't** been deleted
- assertNotNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("nonDurable.myClientId.mySub")));
+ assertNotNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub:shared-volatile")));
connection.close();
}
@@ -119,14 +119,14 @@ public class ClientDefinedMultiConsumerTest extends AmqpClientTestSupport {
assertNotNull(amqpMessage);
amqpMessage = receiver2.receive(5, TimeUnit.SECONDS);
assertNotNull(amqpMessage);
- assertEquals(2, ((QueueImpl)server.getPostOffice().getBinding(SimpleString.toSimpleString("nonDurable.myClientId.mySub")).getBindable()).getConsumerCount());
- assertNotNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("nonDurable.myClientId.mySub")));
+ assertEquals(2, ((QueueImpl)server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub:shared-volatile")).getBindable()).getConsumerCount());
+ assertNotNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub:shared-volatile")));
//check its been deleted
connection.close();
Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisfied() throws Exception {
- return server.getPostOffice().getBinding(SimpleString.toSimpleString("nonDurable.myClientId.mySub")) == null;
+ return server.getPostOffice().getBinding(SimpleString.toSimpleString("myClientId.mySub:shared-volatile")) == null;
}
}, 1000);
}
@@ -150,15 +150,15 @@ public class ClientDefinedMultiConsumerTest extends AmqpClientTestSupport {
assertNotNull(amqpMessage);
amqpMessage = receiver2.receive(5, TimeUnit.SECONDS);
assertNotNull(amqpMessage);
- assertEquals(2, ((QueueImpl)server.getPostOffice().getBinding(SimpleString.toSimpleString("nonDurable.mySub")).getBindable()).getConsumerCount());
+ assertEquals(2, ((QueueImpl)server.getPostOffice().getBinding(SimpleString.toSimpleString("mySub:shared-volatile:global")).getBindable()).getConsumerCount());
receiver.close();
- assertNotNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("nonDurable.mySub")));
+ assertNotNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("mySub:shared-volatile:global")));
receiver2.close();
//check its been deleted
Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisfied() throws Exception {
- return server.getPostOffice().getBinding(SimpleString.toSimpleString("nonDurable.mySub")) == null;
+ return server.getPostOffice().getBinding(SimpleString.toSimpleString("mySub:shared-volatile:global")) == null;
}
}, 1000);
connection.close();
@@ -287,12 +287,12 @@ public class ClientDefinedMultiConsumerTest extends AmqpClientTestSupport {
assertNotNull(amqpMessage);
amqpMessage = receiver2.receive(5, TimeUnit.SECONDS);
assertNotNull(amqpMessage);
- assertEquals(2, ((QueueImpl)server.getPostOffice().getBinding(SimpleString.toSimpleString("mySub")).getBindable()).getConsumerCount());
+ assertEquals(2, ((QueueImpl)server.getPostOffice().getBinding(SimpleString.toSimpleString("mySub:global")).getBindable()).getConsumerCount());
receiver.close();
- assertNotNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("mySub")));
+ assertNotNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("mySub:global")));
receiver2.close();
//check its been deleted
- assertNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("mySub")));
+ assertNull(server.getPostOffice().getBinding(SimpleString.toSimpleString("mySub:global")));
connection.close();
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/44b7e455/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSClientTestSupport.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSClientTestSupport.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSClientTestSupport.java
index edd4968..3f96711 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSClientTestSupport.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSClientTestSupport.java
@@ -23,6 +23,7 @@ import javax.jms.Connection;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
+import org.apache.activemq.artemis.jms.client.ActiveMQJMSConnectionFactory;
import org.apache.qpid.jms.JmsConnectionFactory;
import org.jboss.logging.Logger;
import org.junit.After;
@@ -154,4 +155,55 @@ public abstract class JMSClientTestSupport extends AmqpClientTestSupport {
return connection;
}
+
+
+ protected String getBrokerCoreJMSConnectionString() {
+
+ try {
+ int port = AMQP_PORT;
+
+ String uri = null;
+
+ if (isUseSSL()) {
+ uri = "tcp://127.0.0.1:" + port;
+ } else {
+ uri = "tcp://127.0.0.1:" + port;
+ }
+
+ if (!getJmsConnectionURIOptions().isEmpty()) {
+ uri = uri + "?" + getJmsConnectionURIOptions();
+ }
+
+ return uri;
+ } catch (Exception e) {
+ throw new RuntimeException();
+ }
+ }
+
+ protected Connection createCoreConnection() throws JMSException {
+ return createCoreConnection(getBrokerCoreJMSConnectionString(), null, null, null, true);
+ }
+
+ private Connection createCoreConnection(String connectionString, String username, String password, String clientId, boolean start) throws JMSException {
+ ActiveMQJMSConnectionFactory factory = new ActiveMQJMSConnectionFactory(connectionString);
+
+ Connection connection = trackJMSConnection(factory.createConnection(username, password));
+
+ connection.setExceptionListener(new ExceptionListener() {
+ @Override
+ public void onException(JMSException exception) {
+ exception.printStackTrace();
+ }
+ });
+
+ if (clientId != null && !clientId.isEmpty()) {
+ connection.setClientID(clientId);
+ }
+
+ if (start) {
+ connection.start();
+ }
+
+ return connection;
+ }
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/44b7e455/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSDurableConsumerTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSDurableConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSDurableConsumerTest.java
index 26097f6..31de59d 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSDurableConsumerTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSDurableConsumerTest.java
@@ -16,6 +16,8 @@
*/
package org.apache.activemq.artemis.tests.integration.amqp;
+import java.util.Arrays;
+import java.util.Collection;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
@@ -31,11 +33,31 @@ import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.tests.util.Wait;
import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+@RunWith(Parameterized.class)
public class JMSDurableConsumerTest extends JMSClientTestSupport {
+ @Parameterized.Parameters(name = "{index}: amqpUseCoreSubscriptionNaming={0}")
+ public static Collection<Object[]> parameters() {
+ return Arrays.asList(new Object[][] {
+ {true}, {false}
+ });
+ }
+
+ /* NOT private @see https://github.com/junit-team/junit4/wiki/parameterized-tests */
+ @Parameterized.Parameter(0)
+ public boolean amqpUseCoreSubscriptionNaming;
+
+ @Override
+ protected void addConfiguration(ActiveMQServer server) {
+ server.getConfiguration().setAmqpUseCoreSubscriptionNaming(amqpUseCoreSubscriptionNaming);
+ }
+
@Test(timeout = 30000)
public void testDurableConsumerAsync() throws Exception {
final CountDownLatch latch = new CountDownLatch(1);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/44b7e455/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSSharedConsumerTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSSharedConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSSharedConsumerTest.java
index c49fcff..4113e4e 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSSharedConsumerTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSSharedConsumerTest.java
@@ -18,7 +18,6 @@ package org.apache.activemq.artemis.tests.integration.amqp;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
-import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
@@ -27,11 +26,33 @@ import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
-import org.apache.activemq.artemis.jms.client.ActiveMQJMSConnectionFactory;
+import java.util.Arrays;
+import java.util.Collection;
+
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+@RunWith(Parameterized.class)
public class JMSSharedConsumerTest extends JMSClientTestSupport {
+ @Parameterized.Parameters(name = "{index}: amqpUseCoreSubscriptionNaming={0}")
+ public static Collection<Object[]> parameters() {
+ return Arrays.asList(new Object[][] {
+ {true}, {false}
+ });
+ }
+
+ /* NOT private @see https://github.com/junit-team/junit4/wiki/parameterized-tests */
+ @Parameterized.Parameter(0)
+ public boolean amqpUseCoreSubscriptionNaming;
+
+ @Override
+ protected void addConfiguration(ActiveMQServer server) {
+ server.getConfiguration().setAmqpUseCoreSubscriptionNaming(amqpUseCoreSubscriptionNaming);
+ }
+
@Override
protected String getConfiguredProtocols() {
return "AMQP,OPENWIRE,CORE";
@@ -94,6 +115,7 @@ public class JMSSharedConsumerTest extends JMSClientTestSupport {
@Test(timeout = 30000)
public void testSharedConsumerWithAMQPClientAndArtemisClient() throws Exception {
+ org.junit.Assume.assumeTrue(amqpUseCoreSubscriptionNaming);
Connection connection = createConnection(); //AMQP
Connection connection2 = createCoreConnection(); //CORE
@@ -104,6 +126,7 @@ public class JMSSharedConsumerTest extends JMSClientTestSupport {
@Test(timeout = 30000)
public void testSharedConsumerWithArtemisClientAndAMQPClient() throws Exception {
+ org.junit.Assume.assumeTrue(amqpUseCoreSubscriptionNaming);
Connection connection = createCoreConnection(); //CORE
Connection connection2 = createConnection(); //AMQP
@@ -111,56 +134,4 @@ public class JMSSharedConsumerTest extends JMSClientTestSupport {
testSharedConsumer(connection, connection2);
}
-
-
- protected String getBrokerCoreJMSConnectionString() {
-
- try {
- int port = AMQP_PORT;
-
- String uri = null;
-
- if (isUseSSL()) {
- uri = "tcp://127.0.0.1:" + port;
- } else {
- uri = "tcp://127.0.0.1:" + port;
- }
-
- if (!getJmsConnectionURIOptions().isEmpty()) {
- uri = uri + "?" + getJmsConnectionURIOptions();
- }
-
- return uri;
- } catch (Exception e) {
- throw new RuntimeException();
- }
- }
-
- protected Connection createCoreConnection() throws JMSException {
- return createCoreConnection(getBrokerCoreJMSConnectionString(), null, null, null, true);
- }
-
- private Connection createCoreConnection(String connectionString, String username, String password, String clientId, boolean start) throws JMSException {
- ActiveMQJMSConnectionFactory factory = new ActiveMQJMSConnectionFactory(connectionString);
-
- Connection connection = trackJMSConnection(factory.createConnection(username, password));
-
- connection.setExceptionListener(new ExceptionListener() {
- @Override
- public void onException(JMSException exception) {
- exception.printStackTrace();
- }
- });
-
- if (clientId != null && !clientId.isEmpty()) {
- connection.setClientID(clientId);
- }
-
- if (start) {
- connection.start();
- }
-
- return connection;
- }
-
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/44b7e455/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSSharedDurableConsumerTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSSharedDurableConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSSharedDurableConsumerTest.java
index 040506b..ad0d9dd 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSSharedDurableConsumerTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSSharedDurableConsumerTest.java
@@ -18,7 +18,6 @@ package org.apache.activemq.artemis.tests.integration.amqp;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
-import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
@@ -27,11 +26,33 @@ import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
-import org.apache.activemq.artemis.jms.client.ActiveMQJMSConnectionFactory;
+import java.util.Arrays;
+import java.util.Collection;
+
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+@RunWith(Parameterized.class)
public class JMSSharedDurableConsumerTest extends JMSClientTestSupport {
+ @Parameterized.Parameters(name = "{index}: amqpUseCoreSubscriptionNaming={0}")
+ public static Collection<Object[]> parameters() {
+ return Arrays.asList(new Object[][] {
+ {true}, {false}
+ });
+ }
+
+ /* NOT private @see https://github.com/junit-team/junit4/wiki/parameterized-tests */
+ @Parameterized.Parameter(0)
+ public boolean amqpUseCoreSubscriptionNaming;
+
+ @Override
+ protected void addConfiguration(ActiveMQServer server) {
+ server.getConfiguration().setAmqpUseCoreSubscriptionNaming(amqpUseCoreSubscriptionNaming);
+ }
+
@Override
protected String getConfiguredProtocols() {
return "AMQP,OPENWIRE,CORE";
@@ -68,6 +89,10 @@ public class JMSSharedDurableConsumerTest extends JMSClientTestSupport {
}
assertNotNull("Should have received a message by now.", received);
assertTrue("Should be an instance of TextMessage", received instanceof TextMessage);
+
+ consumer1.close();
+ consumer2.close();
+ session1.unsubscribe("SharedConsumer");
} finally {
connection1.close();
connection2.close();
@@ -94,6 +119,7 @@ public class JMSSharedDurableConsumerTest extends JMSClientTestSupport {
@Test(timeout = 30000)
public void testSharedDurableConsumerWithAMQPClientAndArtemisClient() throws Exception {
+ org.junit.Assume.assumeTrue(amqpUseCoreSubscriptionNaming);
Connection connection = createConnection(); //AMQP
Connection connection2 = createCoreConnection(); //CORE
@@ -104,6 +130,7 @@ public class JMSSharedDurableConsumerTest extends JMSClientTestSupport {
@Test(timeout = 30000)
public void testSharedDurableConsumerWithArtemisClientAndAMQPClient() throws Exception {
+ org.junit.Assume.assumeTrue(amqpUseCoreSubscriptionNaming);
Connection connection = createCoreConnection(); //CORE
Connection connection2 = createConnection(); //AMQP
@@ -111,56 +138,4 @@ public class JMSSharedDurableConsumerTest extends JMSClientTestSupport {
testSharedDurableConsumer(connection, connection2);
}
-
-
- protected String getBrokerCoreJMSConnectionString() {
-
- try {
- int port = AMQP_PORT;
-
- String uri = null;
-
- if (isUseSSL()) {
- uri = "tcp://127.0.0.1:" + port;
- } else {
- uri = "tcp://127.0.0.1:" + port;
- }
-
- if (!getJmsConnectionURIOptions().isEmpty()) {
- uri = uri + "?" + getJmsConnectionURIOptions();
- }
-
- return uri;
- } catch (Exception e) {
- throw new RuntimeException();
- }
- }
-
- protected Connection createCoreConnection() throws JMSException {
- return createCoreConnection(getBrokerCoreJMSConnectionString(), null, null, null, true);
- }
-
- private Connection createCoreConnection(String connectionString, String username, String password, String clientId, boolean start) throws JMSException {
- ActiveMQJMSConnectionFactory factory = new ActiveMQJMSConnectionFactory(connectionString);
-
- Connection connection = trackJMSConnection(factory.createConnection(username, password));
-
- connection.setExceptionListener(new ExceptionListener() {
- @Override
- public void onException(JMSException exception) {
- exception.printStackTrace();
- }
- });
-
- if (clientId != null && !clientId.isEmpty()) {
- connection.setClientID(clientId);
- }
-
- if (start) {
- connection.start();
- }
-
- return connection;
- }
-
}