You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2018/04/27 01:20:12 UTC
[1/2] activemq-artemis git commit: This closes #2042
Repository: activemq-artemis
Updated Branches:
refs/heads/master e937c9903 -> d18e8439b
This closes #2042
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/d18e8439
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/d18e8439
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/d18e8439
Branch: refs/heads/master
Commit: d18e8439b5c896125806a557f163548722e32aff
Parents: e937c99 d773e8f
Author: Clebert Suconic <cl...@apache.org>
Authored: Thu Apr 26 21:20:07 2018 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Thu Apr 26 21:20:07 2018 -0400
----------------------------------------------------------------------
.../core/protocol/stomp/StompConnection.java | 38 +++--
.../stomp/VersionedStompFrameHandler.java | 12 +-
.../stomp/v10/StompFrameHandlerV10.java | 46 +++---
.../tests/integration/stomp/StompTest.java | 155 +++++++++++++++++++
4 files changed, 219 insertions(+), 32 deletions(-)
----------------------------------------------------------------------
[2/2] activemq-artemis git commit: ARTEMIS-1794 STOMP clients using
same addr w/diff routing types
Posted by cl...@apache.org.
ARTEMIS-1794 STOMP clients using same addr w/diff routing types
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/d773e8f6
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/d773e8f6
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/d773e8f6
Branch: refs/heads/master
Commit: d773e8f66b5db2b0fcaa646384c24690d38aaa69
Parents: e937c99
Author: Justin Bertram <jb...@apache.org>
Authored: Tue Apr 10 10:41:15 2018 -0500
Committer: Clebert Suconic <cl...@apache.org>
Committed: Thu Apr 26 21:20:07 2018 -0400
----------------------------------------------------------------------
.../core/protocol/stomp/StompConnection.java | 38 +++--
.../stomp/VersionedStompFrameHandler.java | 12 +-
.../stomp/v10/StompFrameHandlerV10.java | 46 +++---
.../tests/integration/stomp/StompTest.java | 155 +++++++++++++++++++
4 files changed, 219 insertions(+), 32 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/d773e8f6/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java
index 32e64b3..fbd0107 100644
--- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java
+++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java
@@ -18,6 +18,7 @@ package org.apache.activemq.artemis.core.protocol.stomp;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.EnumSet;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
@@ -262,7 +263,7 @@ public final class StompConnection implements RemotingConnection {
// TODO this should take a type - send or receive so it knows whether to check the address or the queue
public void checkDestination(String destination) throws ActiveMQStompException {
- if (!manager.destinationExists(getSession().getCoreSession().removePrefix(SimpleString.toSimpleString(destination)).toString())) {
+ if (!manager.destinationExists(destination)) {
throw BUNDLE.destinationNotExist(destination).setHandler(frameHandler);
}
}
@@ -272,28 +273,47 @@ public final class StompConnection implements RemotingConnection {
try {
SimpleString simpleQueue = SimpleString.toSimpleString(queue);
- if (manager.getServer().getAddressInfo(simpleQueue) == null) {
- AddressSettings addressSettings = manager.getServer().getAddressSettingsRepository().getMatch(queue);
-
- RoutingType effectiveAddressRoutingType = routingType == null ? addressSettings.getDefaultAddressRoutingType() : routingType;
+ AddressInfo addressInfo = manager.getServer().getAddressInfo(simpleQueue);
+ AddressSettings addressSettings = manager.getServer().getAddressSettingsRepository().getMatch(queue);
+ RoutingType effectiveAddressRoutingType = routingType == null ? addressSettings.getDefaultAddressRoutingType() : routingType;
+ boolean checkAnycast = false;
+ /**
+ * If the address doesn't exist then it is created if possible.
+ * If the address does exist but doesn't support the routing-type then the address is updated if possible.
+ */
+ if (addressInfo == null) {
if (addressSettings.isAutoCreateAddresses()) {
session.createAddress(simpleQueue, effectiveAddressRoutingType, true);
}
- // only auto create the queue if the address is ANYCAST
- if (effectiveAddressRoutingType == RoutingType.ANYCAST && addressSettings.isAutoCreateQueues()) {
- session.createQueue(simpleQueue, simpleQueue, routingType == null ? addressSettings.getDefaultQueueRoutingType() : routingType, null, false, true, true);
+ checkAnycast = true;
+ } else if (!addressInfo.getRoutingTypes().contains(effectiveAddressRoutingType)) {
+ if (addressSettings.isAutoCreateAddresses()) {
+ EnumSet<RoutingType> routingTypes = EnumSet.noneOf(RoutingType.class);
+ for (RoutingType existingRoutingType : addressInfo.getRoutingTypes()) {
+ routingTypes.add(existingRoutingType);
+ }
+ routingTypes.add(effectiveAddressRoutingType);
+ manager.getServer().updateAddressInfo(simpleQueue, routingTypes);
}
+
+ checkAnycast = true;
+ }
+
+ // only auto create the queue if the address is ANYCAST
+ if (checkAnycast && effectiveAddressRoutingType == RoutingType.ANYCAST && addressSettings.isAutoCreateQueues()) {
+ session.createQueue(simpleQueue, simpleQueue, routingType == null ? addressSettings.getDefaultQueueRoutingType() : routingType, null, false, true, true);
}
} catch (ActiveMQQueueExistsException e) {
// ignore
} catch (Exception e) {
+ ActiveMQStompProtocolLogger.LOGGER.debug("Exception while auto-creating destination", e);
throw new ActiveMQStompException(e.getMessage(), e).setHandler(frameHandler);
}
}
public void checkRoutingSemantics(String destination, RoutingType routingType) throws ActiveMQStompException {
- AddressInfo addressInfo = manager.getServer().getAddressInfo(getSession().getCoreSession().removePrefix(SimpleString.toSimpleString(destination)));
+ AddressInfo addressInfo = manager.getServer().getAddressInfo(SimpleString.toSimpleString(destination));
// may be null here if, for example, the management address is being checked
if (addressInfo != null) {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/d773e8f6/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java
index 3cb5ab8..941cee6 100644
--- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java
+++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java
@@ -279,8 +279,16 @@ public abstract class VersionedStompFrameHandler {
return connection.subscribe(destination, selector, ack, id, durableSubscriptionName, noLocal, routingType);
}
- public String getDestination(StompFrame request) {
- return request.getHeader(Headers.Subscribe.DESTINATION);
+ public String getDestination(StompFrame request) throws ActiveMQStompException {
+ return getDestination(request, Headers.Subscribe.DESTINATION);
+ }
+
+ public String getDestination(StompFrame request, String header) throws ActiveMQStompException {
+ String destination = request.getHeader(header);
+ if (destination == null) {
+ return null;
+ }
+ return connection.getSession().getCoreSession().removePrefix(SimpleString.toSimpleString(destination)).toString();
}
public StompFrame postprocess(StompFrame request) {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/d773e8f6/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/v10/StompFrameHandlerV10.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/v10/StompFrameHandlerV10.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/v10/StompFrameHandlerV10.java
index a6785b7..2011e05 100644
--- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/v10/StompFrameHandlerV10.java
+++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/v10/StompFrameHandlerV10.java
@@ -94,32 +94,36 @@ public class StompFrameHandlerV10 extends VersionedStompFrameHandler implements
@Override
public StompFrame onUnsubscribe(StompFrame request) {
StompFrame response = null;
- String destination = request.getHeader(Stomp.Headers.Unsubscribe.DESTINATION);
- String id = request.getHeader(Stomp.Headers.Unsubscribe.ID);
- String durableSubscriptionName = request.getHeader(Stomp.Headers.Unsubscribe.DURABLE_SUBSCRIBER_NAME);
- if (durableSubscriptionName == null) {
- durableSubscriptionName = request.getHeader(Stomp.Headers.Unsubscribe.DURABLE_SUBSCRIPTION_NAME);
- }
- if (durableSubscriptionName == null) {
- durableSubscriptionName = request.getHeader(Stomp.Headers.Unsubscribe.ACTIVEMQ_DURABLE_SUBSCRIPTION_NAME);
- }
+ try {
+ String destination = getDestination(request, Stomp.Headers.Unsubscribe.DESTINATION);
+ String id = request.getHeader(Stomp.Headers.Unsubscribe.ID);
+ String durableSubscriptionName = request.getHeader(Stomp.Headers.Unsubscribe.DURABLE_SUBSCRIBER_NAME);
+ if (durableSubscriptionName == null) {
+ durableSubscriptionName = request.getHeader(Stomp.Headers.Unsubscribe.DURABLE_SUBSCRIPTION_NAME);
+ }
+ if (durableSubscriptionName == null) {
+ durableSubscriptionName = request.getHeader(Stomp.Headers.Unsubscribe.ACTIVEMQ_DURABLE_SUBSCRIPTION_NAME);
+ }
- String subscriptionID = null;
- if (id != null) {
- subscriptionID = id;
- } else {
- if (destination == null) {
- ActiveMQStompException error = BUNDLE.needIDorDestination().setHandler(this);
- response = error.getFrame();
- return response;
+ String subscriptionID = null;
+ if (id != null) {
+ subscriptionID = id;
+ } else {
+ if (destination == null) {
+ ActiveMQStompException error = BUNDLE.needIDorDestination().setHandler(this);
+ response = error.getFrame();
+ return response;
+ }
+ subscriptionID = "subscription/" + destination;
}
- subscriptionID = "subscription/" + destination;
- }
- try {
connection.unsubscribe(subscriptionID, durableSubscriptionName);
+
} catch (ActiveMQStompException e) {
- return e.getFrame();
+ response = e.getFrame();
+ } catch (Exception e) {
+ ActiveMQStompException error = BUNDLE.errorHandleSend(e).setHandler(this);
+ response = error.getFrame();
}
return response;
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/d773e8f6/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java
index cd18d0c..de6a11d 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java
@@ -1573,6 +1573,159 @@ public class StompTest extends StompTestBase {
conn.disconnect();
}
+ /**
+ * This test and testPrefixedAutoCreatedMulticastAndAnycastWithSameName are basically the same but doing the
+ * operations in opposite order. In this test the anycast subscription is created first.
+ * @throws Exception
+ */
+ @Test
+ public void testPrefixedAutoCreatedAnycastAndMulticastWithSameName() throws Exception {
+ int port = 61614;
+
+ URI uri = createStompClientUri(scheme, hostname, port);
+
+ final String ADDRESS = UUID.randomUUID().toString();
+ server.getActiveMQServer().getRemotingService().createAcceptor("test", "tcp://" + hostname + ":" + port + "?protocols=STOMP&anycastPrefix=/queue/&multicastPrefix=/topic/").start();
+ StompClientConnection conn = StompClientConnectionFactory.createClientConnection(uri);
+ conn.connect(defUser, defPass);
+
+ // since this queue doesn't exist the broker should create a new ANYCAST address & queue
+ String uuid = UUID.randomUUID().toString();
+ ClientStompFrame frame = conn.createFrame(Stomp.Commands.SUBSCRIBE)
+ .addHeader(Stomp.Headers.Subscribe.DESTINATION, "/queue/" + ADDRESS)
+ .addHeader(Stomp.Headers.RECEIPT_REQUESTED, uuid);
+ frame = conn.sendFrame(frame);
+ assertEquals(uuid, frame.getHeader(Stomp.Headers.Response.RECEIPT_ID));
+
+ AddressInfo addressInfo = server.getActiveMQServer().getAddressInfo(SimpleString.toSimpleString(ADDRESS));
+ assertNotNull("No address was created with the name " + ADDRESS, addressInfo);
+ assertTrue(addressInfo.getRoutingTypes().contains(RoutingType.ANYCAST));
+ assertFalse(addressInfo.getRoutingTypes().contains(RoutingType.MULTICAST));
+ assertNotNull(server.getActiveMQServer().locateQueue(SimpleString.toSimpleString(ADDRESS)));
+
+ // sending a MULTICAST message should alter the address to support MULTICAST
+ frame = send(conn, "/topic/" + ADDRESS, null, "Hello World 1", true);
+ assertFalse(frame.getCommand().equals("ERROR"));
+ addressInfo = server.getActiveMQServer().getAddressInfo(SimpleString.toSimpleString(ADDRESS));
+ assertTrue(addressInfo.getRoutingTypes().contains(RoutingType.ANYCAST));
+ assertTrue(addressInfo.getRoutingTypes().contains(RoutingType.MULTICAST));
+
+ // however, no message should be routed to the ANYCAST queue
+ frame = conn.receiveFrame(1000);
+ Assert.assertNull(frame);
+
+ // sending a message to the ANYCAST queue, should be received
+ frame = send(conn, "/queue/" + ADDRESS, null, "Hello World 2", true);
+ assertFalse(frame.getCommand().equals("ERROR"));
+ frame = conn.receiveFrame(1000);
+ Assert.assertEquals(Stomp.Responses.MESSAGE, frame.getCommand());
+ Assert.assertEquals("Hello World 2", frame.getBody());
+ Assert.assertEquals(RoutingType.ANYCAST.toString(), frame.getHeader(Stomp.Headers.Send.DESTINATION_TYPE));
+ frame = conn.receiveFrame(1000);
+ Assert.assertNull(frame);
+
+ unsubscribe(conn, null, "/queue/" + ADDRESS, true, false);
+
+ // now subscribe to the address in a MULTICAST way which will create a MULTICAST queue for the subscription
+ uuid = UUID.randomUUID().toString();
+ frame = conn.createFrame(Stomp.Commands.SUBSCRIBE)
+ .addHeader(Stomp.Headers.Subscribe.DESTINATION, "/topic/" + ADDRESS)
+ .addHeader(Stomp.Headers.RECEIPT_REQUESTED, uuid);
+ frame = conn.sendFrame(frame);
+ assertEquals(uuid, frame.getHeader(Stomp.Headers.Response.RECEIPT_ID));
+
+ // send a message which will be routed to the MULTICAST queue
+ frame = send(conn, "/topic/" + ADDRESS, null, "Hello World 3", true);
+ assertFalse(frame.getCommand().equals("ERROR"));
+
+ // receive that message on the topic subscription
+ frame = conn.receiveFrame(1000);
+ Assert.assertEquals(Stomp.Responses.MESSAGE, frame.getCommand());
+ Assert.assertEquals("Hello World 3", frame.getBody());
+ Assert.assertEquals(RoutingType.MULTICAST.toString(), frame.getHeader(Stomp.Headers.Send.DESTINATION_TYPE));
+ frame = conn.receiveFrame(1000);
+ Assert.assertNull(frame);
+
+ unsubscribe(conn, null, "/topic/" + ADDRESS, true, false);
+
+ conn.disconnect();
+ }
+
+ /**
+ * This test and testPrefixedAutoCreatedMulticastAndAnycastWithSameName are basically the same but doing the
+ * operations in opposite order. In this test the multicast subscription is created first.
+ * @throws Exception
+ */
+ @Test
+ public void testPrefixedAutoCreatedMulticastAndAnycastWithSameName() throws Exception {
+ int port = 61614;
+
+ URI uri = createStompClientUri(scheme, hostname, port);
+
+ final String ADDRESS = UUID.randomUUID().toString();
+ server.getActiveMQServer().getRemotingService().createAcceptor("test", "tcp://" + hostname + ":" + port + "?protocols=STOMP&anycastPrefix=/queue/&multicastPrefix=/topic/").start();
+ StompClientConnection conn = StompClientConnectionFactory.createClientConnection(uri);
+ conn.connect(defUser, defPass);
+
+ // since this queue doesn't exist the broker should create a new MULTICAST address
+ String uuid = UUID.randomUUID().toString();
+ ClientStompFrame frame = conn.createFrame(Stomp.Commands.SUBSCRIBE)
+ .addHeader(Stomp.Headers.Subscribe.DESTINATION, "/topic/" + ADDRESS)
+ .addHeader(Stomp.Headers.RECEIPT_REQUESTED, uuid);
+ frame = conn.sendFrame(frame);
+ assertEquals(uuid, frame.getHeader(Stomp.Headers.Response.RECEIPT_ID));
+
+ AddressInfo addressInfo = server.getActiveMQServer().getAddressInfo(SimpleString.toSimpleString(ADDRESS));
+ assertNotNull("No address was created with the name " + ADDRESS, addressInfo);
+ assertTrue(addressInfo.getRoutingTypes().contains(RoutingType.MULTICAST));
+ assertFalse(addressInfo.getRoutingTypes().contains(RoutingType.ANYCAST));
+
+ // sending an ANYCAST message should alter the address to support ANYCAST and create an ANYCAST queue
+ frame = send(conn, "/queue/" + ADDRESS, null, "Hello World 1", true);
+ assertFalse(frame.getCommand().equals("ERROR"));
+ addressInfo = server.getActiveMQServer().getAddressInfo(SimpleString.toSimpleString(ADDRESS));
+ assertTrue(addressInfo.getRoutingTypes().contains(RoutingType.ANYCAST));
+ assertTrue(addressInfo.getRoutingTypes().contains(RoutingType.MULTICAST));
+ assertNotNull(server.getActiveMQServer().locateQueue(SimpleString.toSimpleString(ADDRESS)));
+
+ // however, no message should be routed to the MULTICAST queue
+ frame = conn.receiveFrame(1000);
+ Assert.assertNull(frame);
+
+ // sending a message to the MULTICAST queue, should be received
+ frame = send(conn, "/topic/" + ADDRESS, null, "Hello World 2", true);
+ assertFalse(frame.getCommand().equals("ERROR"));
+ frame = conn.receiveFrame(2000);
+ Assert.assertEquals(Stomp.Responses.MESSAGE, frame.getCommand());
+ Assert.assertEquals("Hello World 2", frame.getBody());
+ Assert.assertEquals(RoutingType.MULTICAST.toString(), frame.getHeader(Stomp.Headers.Send.DESTINATION_TYPE));
+ frame = conn.receiveFrame(1000);
+ Assert.assertNull(frame);
+
+ frame = unsubscribe(conn, null, "/topic/" + ADDRESS, true, false);
+ assertFalse(frame.getCommand().equals("ERROR"));
+
+ // now subscribe to the address in an ANYCAST way
+ uuid = UUID.randomUUID().toString();
+ frame = conn.createFrame(Stomp.Commands.SUBSCRIBE)
+ .addHeader(Stomp.Headers.Subscribe.DESTINATION, "/queue/" + ADDRESS)
+ .addHeader(Stomp.Headers.RECEIPT_REQUESTED, uuid);
+ frame = conn.sendFrame(frame);
+ assertEquals(uuid, frame.getHeader(Stomp.Headers.Response.RECEIPT_ID));
+
+ // receive that message on the ANYCAST queue
+ frame = conn.receiveFrame(1000);
+ Assert.assertEquals(Stomp.Responses.MESSAGE, frame.getCommand());
+ Assert.assertEquals("Hello World 1", frame.getBody());
+ Assert.assertEquals(RoutingType.ANYCAST.toString(), frame.getHeader(Stomp.Headers.Send.DESTINATION_TYPE));
+ frame = conn.receiveFrame(2000);
+ Assert.assertNull(frame);
+
+ unsubscribe(conn, null, "/queue/" + ADDRESS, true, false);
+
+ conn.disconnect();
+ }
+
@Test
public void testDotPrefixedSendAndRecieveAnycast() throws Exception {
testPrefixedSendAndRecieve("jms.queue.", RoutingType.ANYCAST);
@@ -1626,11 +1779,13 @@ public class StompTest extends StompTestBase {
@Test
public void testMulticastOperationsOnAnycastAddress() throws Exception {
+ server.getActiveMQServer().getAddressSettingsRepository().addMatch("#", new AddressSettings().setAutoCreateAddresses(false).setAutoCreateQueues(false));
testRoutingSemantics(RoutingType.MULTICAST.toString(), getQueuePrefix() + getQueueName());
}
@Test
public void testAnycastOperationsOnMulticastAddress() throws Exception {
+ server.getActiveMQServer().getAddressSettingsRepository().addMatch("#", new AddressSettings().setAutoCreateAddresses(false).setAutoCreateQueues(false));
testRoutingSemantics(RoutingType.ANYCAST.toString(), getTopicPrefix() + getTopicName());
}