You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by jb...@apache.org on 2016/11/23 17:43:10 UTC
[09/48] activemq-artemis git commit: Check routing semantics for
STOMP senders/subscribers
Check routing semantics for STOMP senders/subscribers
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/c756499c
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/c756499c
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/c756499c
Branch: refs/heads/ARTEMIS-780
Commit: c756499ccb0b18940c92ab7d15cc1a85e81dd932
Parents: 3020837
Author: jbertram <jb...@apache.com>
Authored: Tue Nov 15 10:27:55 2016 -0600
Committer: jbertram <jb...@apache.com>
Committed: Tue Nov 15 10:38:13 2016 -0600
----------------------------------------------------------------------
.../protocol/stomp/ActiveMQStompException.java | 4 +--
.../ActiveMQStompProtocolMessageBundle.java | 7 ++--
.../core/protocol/stomp/StompConnection.java | 20 ++++++++++--
.../stomp/VersionedStompFrameHandler.java | 6 +++-
.../tests/integration/stomp/StompTest.java | 34 ++++++++++++++++++++
.../tests/integration/stomp/StompTestBase.java | 3 +-
.../integration/stomp/v11/StompV11Test.java | 18 -----------
.../integration/stomp/v12/StompV12Test.java | 15 ---------
8 files changed, 65 insertions(+), 42 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c756499c/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/ActiveMQStompException.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/ActiveMQStompException.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/ActiveMQStompException.java
index 118f7f8..69fa130 100644
--- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/ActiveMQStompException.java
+++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/ActiveMQStompException.java
@@ -40,12 +40,12 @@ public class ActiveMQStompException extends Exception {
}
public ActiveMQStompException(String msg) {
- super(msg);
+ super(msg.replace(":", ""));
handler = null;
}
public ActiveMQStompException(String msg, Throwable t) {
- super(msg, t);
+ super(msg.replace(":", ""), t);
this.body = t.getMessage();
handler = null;
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c756499c/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/ActiveMQStompProtocolMessageBundle.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/ActiveMQStompProtocolMessageBundle.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/ActiveMQStompProtocolMessageBundle.java
index e535725..c1f93e4 100644
--- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/ActiveMQStompProtocolMessageBundle.java
+++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/ActiveMQStompProtocolMessageBundle.java
@@ -86,10 +86,10 @@ public interface ActiveMQStompProtocolMessageBundle {
ActiveMQStompException noDestination();
@Message(id = 339016, value = "Error creating subscription {0}", format = Message.Format.MESSAGE_FORMAT)
- ActiveMQStompException errorCreatSubscription(String subscriptionID, @Cause Exception e);
+ ActiveMQStompException errorCreatingSubscription(String subscriptionID, @Cause Exception e);
@Message(id = 339017, value = "Error unsubscribing {0}", format = Message.Format.MESSAGE_FORMAT)
- ActiveMQStompException errorUnsubscrib(String subscriptionID, @Cause Exception e);
+ ActiveMQStompException errorUnsubscribing(String subscriptionID, @Cause Exception e);
@Message(id = 339018, value = "Error acknowledging message {0}", format = Message.Format.MESSAGE_FORMAT)
ActiveMQStompException errorAck(String messageID, @Cause Exception e);
@@ -150,4 +150,7 @@ public interface ActiveMQStompProtocolMessageBundle {
@Message(id = 339039, value = "No id header in ACK/NACK frame.")
ActiveMQStompException noIDInAck();
+
+ @Message(id = 339040, value = "Not allowed to specify {0} semantics on {1} address.", format = Message.Format.MESSAGE_FORMAT)
+ ActiveMQStompException illegalSemantics(String requested, String exists);
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c756499c/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java
index a6eab6b..eaeb21d 100644
--- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java
+++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java
@@ -256,7 +256,9 @@ public final class StompConnection implements RemotingConnection {
}
}
- public void autoCreateDestinationIfPossible(String queue, AddressInfo.RoutingType routingType) throws ActiveMQStompException {
+ public boolean autoCreateDestinationIfPossible(String queue, AddressInfo.RoutingType routingType) throws ActiveMQStompException {
+ boolean result = false;
+
try {
if (manager.getServer().getAddressInfo(SimpleString.toSimpleString(queue)) == null) {
// TODO check here to see if auto-creation is enabled
@@ -266,12 +268,22 @@ public final class StompConnection implements RemotingConnection {
manager.getServer().createOrUpdateAddressInfo(new AddressInfo(SimpleString.toSimpleString(queue)).setRoutingType(AddressInfo.RoutingType.ANYCAST).setAutoCreated(true));
manager.getServer().createQueue(SimpleString.toSimpleString(queue), SimpleString.toSimpleString(queue), null, null, true, false, true);
}
+ result = true;
}
} catch (ActiveMQQueueExistsException e) {
// ignore
} catch (Exception e) {
throw new ActiveMQStompException(e.getMessage(), e).setHandler(frameHandler);
}
+
+ return result;
+ }
+
+ public void checkRoutingSemantics(String destination, AddressInfo.RoutingType routingType) throws ActiveMQStompException {
+ AddressInfo.RoutingType actualRoutingTypeOfAddress = manager.getServer().getAddressInfo(SimpleString.toSimpleString(destination)).getRoutingType();
+ if (routingType != null && !routingType.equals(actualRoutingTypeOfAddress)) {
+ throw BUNDLE.illegalSemantics(routingType.toString(), actualRoutingTypeOfAddress.toString());
+ }
}
@Override
@@ -629,6 +641,8 @@ public final class StompConnection implements RemotingConnection {
boolean noLocal,
AddressInfo.RoutingType subscriptionType) throws ActiveMQStompException {
autoCreateDestinationIfPossible(destination, subscriptionType);
+ checkDestination(destination);
+ checkRoutingSemantics(destination, subscriptionType);
if (noLocal) {
String noLocalFilter = CONNECTION_ID_PROP + " <> '" + getID().toString() + "'";
if (selector == null) {
@@ -657,7 +671,7 @@ public final class StompConnection implements RemotingConnection {
} catch (ActiveMQStompException e) {
throw e;
} catch (Exception e) {
- throw BUNDLE.errorCreatSubscription(subscriptionID, e).setHandler(frameHandler);
+ throw BUNDLE.errorCreatingSubscription(subscriptionID, e).setHandler(frameHandler);
}
}
@@ -667,7 +681,7 @@ public final class StompConnection implements RemotingConnection {
} catch (ActiveMQStompException e) {
throw e;
} catch (Exception e) {
- throw BUNDLE.errorUnsubscrib(subscriptionID, e).setHandler(frameHandler);
+ throw BUNDLE.errorUnsubscribing(subscriptionID, e).setHandler(frameHandler);
}
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c756499c/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java
index 06af785..580bade 100644
--- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java
+++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java
@@ -172,6 +172,7 @@ public abstract class VersionedStompFrameHandler {
AddressInfo.RoutingType routingType = getRoutingType(frame.getHeader(Headers.Send.DESTINATION_TYPE), frame.getHeader(Headers.Send.DESTINATION));
connection.autoCreateDestinationIfPossible(destination, routingType);
connection.checkDestination(destination);
+ connection.checkRoutingSemantics(destination, routingType);
String txID = frame.getHeader(Stomp.Headers.TRANSACTION);
long timestamp = System.currentTimeMillis();
@@ -344,12 +345,15 @@ public abstract class VersionedStompFrameHandler {
}
private AddressInfo.RoutingType getRoutingType(String typeHeader, String destination) {
- AddressInfo.RoutingType routingType = AddressInfo.RoutingType.ANYCAST; // default
+ // null is valid to return here so we know when the user didn't provide any routing info
+ AddressInfo.RoutingType routingType = null;
if (typeHeader != null) {
routingType = AddressInfo.RoutingType.valueOf(typeHeader);
} else if (destination != null && !connection.getAnycastPrefix().equals(connection.getMulticastPrefix())) {
if (connection.getMulticastPrefix().length() > 0 && destination.startsWith(connection.getMulticastPrefix())) {
routingType = AddressInfo.RoutingType.MULTICAST;
+ } else if (connection.getAnycastPrefix().length() > 0 && destination.startsWith(connection.getAnycastPrefix())) {
+ routingType = AddressInfo.RoutingType.ANYCAST;
}
}
return routingType;
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c756499c/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java
index 19e9ebe..e7dcc91 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java
@@ -1372,4 +1372,38 @@ public class StompTest extends StompTestBase {
conn.disconnect();
}
+
+ @Test
+ public void testMulticastOperationsOnAnycastAddress() throws Exception {
+ testRoutingSemantics(AddressInfo.RoutingType.MULTICAST.toString(), getQueuePrefix() + getQueueName());
+ }
+
+ @Test
+ public void testAnycastOperationsOnMulticastAddress() throws Exception {
+ testRoutingSemantics(AddressInfo.RoutingType.ANYCAST.toString(), getTopicPrefix() + getTopicName());
+ }
+
+ public void testRoutingSemantics(String routingType, String destination) throws Exception {
+ conn.connect(defUser, defPass);
+
+ String uuid = UUID.randomUUID().toString();
+
+ ClientStompFrame frame = conn.createFrame(Stomp.Commands.SUBSCRIBE)
+ .addHeader(Stomp.Headers.Subscribe.SUBSCRIPTION_TYPE, routingType)
+ .addHeader(Stomp.Headers.Subscribe.DESTINATION, destination)
+ .addHeader(Stomp.Headers.RECEIPT_REQUESTED, uuid);
+
+ frame = conn.sendFrame(frame);
+ assertEquals(Stomp.Responses.ERROR, frame.getCommand());
+
+ uuid = UUID.randomUUID().toString();
+
+ frame = conn.createFrame(Stomp.Commands.SEND)
+ .addHeader(Stomp.Headers.Send.DESTINATION_TYPE, AddressInfo.RoutingType.MULTICAST.toString())
+ .addHeader(Stomp.Headers.Send.DESTINATION, getQueuePrefix() + getQueueName())
+ .addHeader(Stomp.Headers.RECEIPT_REQUESTED, uuid);
+
+ frame = conn.sendFrame(frame);
+ assertEquals(Stomp.Responses.ERROR, frame.getCommand());
+ }
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c756499c/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTestBase.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTestBase.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTestBase.java
index 278d80e..bcac436 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTestBase.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTestBase.java
@@ -218,7 +218,6 @@ public abstract class StompTestBase extends ActiveMQTestBase {
MessageProducer producer = session.createProducer(destination);
TextMessage message = session.createTextMessage(msg);
producer.send(message);
- IntegrationTestLogger.LOGGER.info("Sent message from JMS client to: " + destination);
}
public void sendJmsMessage(byte[] data, Destination destination) throws Exception {
@@ -526,6 +525,8 @@ public abstract class StompTestBase extends ActiveMQTestBase {
assertEquals(uuid, frame.getHeader(Stomp.Headers.Response.RECEIPT_ID));
}
+ IntegrationTestLogger.LOGGER.info("Received: " + frame);
+
return frame;
}
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c756499c/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/v11/StompV11Test.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/v11/StompV11Test.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/v11/StompV11Test.java
index 7cb02a3..6eb57b2 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/v11/StompV11Test.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/v11/StompV11Test.java
@@ -2128,24 +2128,6 @@ public class StompV11Test extends StompTestBase {
}
@Test
- public void testSendMessageToNonExistentQueueWithoutAutoCreation() throws Exception {
- AddressSettings addressSettings = new AddressSettings();
- addressSettings.setAutoCreateJmsQueues(false);
- server.getActiveMQServer().getAddressSettingsRepository().addMatch("#", addressSettings);
- conn.connect(defUser, defPass);
-
- String uuid = UUID.randomUUID().toString();
-
- ClientStompFrame frame = send(conn, "NonExistentQueue" + uuid, null, "Hello World", true, AddressInfo.RoutingType.ANYCAST);
-
- // TODO fix this test by checking auto-create settings
- assertTrue(frame.getCommand().equals(Stomp.Responses.ERROR));
- IntegrationTestLogger.LOGGER.info("message: " + frame.getHeader("message"));
-
- conn.disconnect();
- }
-
- @Test
public void testSendMessageToNonExistentQueueWithAutoCreation() throws Exception {
conn.connect(defUser, defPass);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c756499c/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/v12/StompV12Test.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/v12/StompV12Test.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/v12/StompV12Test.java
index 0a52714..dc8cea0 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/v12/StompV12Test.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/v12/StompV12Test.java
@@ -2167,21 +2167,6 @@ public class StompV12Test extends StompTestBase {
}
@Test
- public void testSendMessageToNonExistentQueueWithoutAutoCreation() throws Exception {
- AddressSettings addressSettings = new AddressSettings();
- addressSettings.setAutoCreateJmsQueues(false);
- server.getActiveMQServer().getAddressSettingsRepository().addMatch("#", addressSettings);
- conn.connect(defUser, defPass);
-
- ClientStompFrame frame = send(conn, "NonExistentQueue" + UUID.randomUUID().toString(), null, "Hello World", true, AddressInfo.RoutingType.ANYCAST);
-
- // TODO this is broken because queue auto-creation is always on
- assertTrue(frame.getCommand().equals(Stomp.Responses.ERROR));
-
- waitDisconnect(conn);
- }
-
- @Test
public void testSendMessageToNonExistentQueueWithAutoCreation() throws Exception {
conn.connect(defUser, defPass);