You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by jb...@apache.org on 2016/11/23 17:43:10 UTC

[09/48] activemq-artemis git commit: Check routing semantics for STOMP senders/subscribers

Check routing semantics for STOMP senders/subscribers


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/c756499c
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/c756499c
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/c756499c

Branch: refs/heads/ARTEMIS-780
Commit: c756499ccb0b18940c92ab7d15cc1a85e81dd932
Parents: 3020837
Author: jbertram <jb...@apache.com>
Authored: Tue Nov 15 10:27:55 2016 -0600
Committer: jbertram <jb...@apache.com>
Committed: Tue Nov 15 10:38:13 2016 -0600

----------------------------------------------------------------------
 .../protocol/stomp/ActiveMQStompException.java  |  4 +--
 .../ActiveMQStompProtocolMessageBundle.java     |  7 ++--
 .../core/protocol/stomp/StompConnection.java    | 20 ++++++++++--
 .../stomp/VersionedStompFrameHandler.java       |  6 +++-
 .../tests/integration/stomp/StompTest.java      | 34 ++++++++++++++++++++
 .../tests/integration/stomp/StompTestBase.java  |  3 +-
 .../integration/stomp/v11/StompV11Test.java     | 18 -----------
 .../integration/stomp/v12/StompV12Test.java     | 15 ---------
 8 files changed, 65 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c756499c/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/ActiveMQStompException.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/ActiveMQStompException.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/ActiveMQStompException.java
index 118f7f8..69fa130 100644
--- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/ActiveMQStompException.java
+++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/ActiveMQStompException.java
@@ -40,12 +40,12 @@ public class ActiveMQStompException extends Exception {
    }
 
    public ActiveMQStompException(String msg) {
-      super(msg);
+      super(msg.replace(":", ""));
       handler = null;
    }
 
    public ActiveMQStompException(String msg, Throwable t) {
-      super(msg, t);
+      super(msg.replace(":", ""), t);
       this.body = t.getMessage();
       handler = null;
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c756499c/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/ActiveMQStompProtocolMessageBundle.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/ActiveMQStompProtocolMessageBundle.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/ActiveMQStompProtocolMessageBundle.java
index e535725..c1f93e4 100644
--- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/ActiveMQStompProtocolMessageBundle.java
+++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/ActiveMQStompProtocolMessageBundle.java
@@ -86,10 +86,10 @@ public interface ActiveMQStompProtocolMessageBundle {
    ActiveMQStompException noDestination();
 
    @Message(id = 339016, value = "Error creating subscription {0}", format = Message.Format.MESSAGE_FORMAT)
-   ActiveMQStompException errorCreatSubscription(String subscriptionID, @Cause Exception e);
+   ActiveMQStompException errorCreatingSubscription(String subscriptionID, @Cause Exception e);
 
    @Message(id = 339017, value = "Error unsubscribing {0}", format = Message.Format.MESSAGE_FORMAT)
-   ActiveMQStompException errorUnsubscrib(String subscriptionID, @Cause Exception e);
+   ActiveMQStompException errorUnsubscribing(String subscriptionID, @Cause Exception e);
 
    @Message(id = 339018, value = "Error acknowledging message {0}", format = Message.Format.MESSAGE_FORMAT)
    ActiveMQStompException errorAck(String messageID, @Cause Exception e);
@@ -150,4 +150,7 @@ public interface ActiveMQStompProtocolMessageBundle {
 
    @Message(id = 339039, value = "No id header in ACK/NACK frame.")
    ActiveMQStompException noIDInAck();
+
+   @Message(id = 339040, value = "Not allowed to specify {0} semantics on {1} address.", format = Message.Format.MESSAGE_FORMAT)
+   ActiveMQStompException illegalSemantics(String requested, String exists);
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c756499c/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java
index a6eab6b..eaeb21d 100644
--- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java
+++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java
@@ -256,7 +256,9 @@ public final class StompConnection implements RemotingConnection {
       }
    }
 
-   public void autoCreateDestinationIfPossible(String queue, AddressInfo.RoutingType routingType) throws ActiveMQStompException {
+   public boolean autoCreateDestinationIfPossible(String queue, AddressInfo.RoutingType routingType) throws ActiveMQStompException {
+      boolean result = false;
+
       try {
          if (manager.getServer().getAddressInfo(SimpleString.toSimpleString(queue)) == null) {
             // TODO check here to see if auto-creation is enabled
@@ -266,12 +268,22 @@ public final class StompConnection implements RemotingConnection {
                manager.getServer().createOrUpdateAddressInfo(new AddressInfo(SimpleString.toSimpleString(queue)).setRoutingType(AddressInfo.RoutingType.ANYCAST).setAutoCreated(true));
                manager.getServer().createQueue(SimpleString.toSimpleString(queue), SimpleString.toSimpleString(queue), null, null, true, false, true);
             }
+            result = true;
          }
       } catch (ActiveMQQueueExistsException e) {
          // ignore
       } catch (Exception e) {
          throw new ActiveMQStompException(e.getMessage(), e).setHandler(frameHandler);
       }
+
+      return result;
+   }
+
+   public void checkRoutingSemantics(String destination, AddressInfo.RoutingType routingType) throws ActiveMQStompException {
+      AddressInfo.RoutingType actualRoutingTypeOfAddress = manager.getServer().getAddressInfo(SimpleString.toSimpleString(destination)).getRoutingType();
+      if (routingType != null && !routingType.equals(actualRoutingTypeOfAddress)) {
+         throw BUNDLE.illegalSemantics(routingType.toString(), actualRoutingTypeOfAddress.toString());
+      }
    }
 
    @Override
@@ -629,6 +641,8 @@ public final class StompConnection implements RemotingConnection {
                   boolean noLocal,
                   AddressInfo.RoutingType subscriptionType) throws ActiveMQStompException {
       autoCreateDestinationIfPossible(destination, subscriptionType);
+      checkDestination(destination);
+      checkRoutingSemantics(destination, subscriptionType);
       if (noLocal) {
          String noLocalFilter = CONNECTION_ID_PROP + " <> '" + getID().toString() + "'";
          if (selector == null) {
@@ -657,7 +671,7 @@ public final class StompConnection implements RemotingConnection {
       } catch (ActiveMQStompException e) {
          throw e;
       } catch (Exception e) {
-         throw BUNDLE.errorCreatSubscription(subscriptionID, e).setHandler(frameHandler);
+         throw BUNDLE.errorCreatingSubscription(subscriptionID, e).setHandler(frameHandler);
       }
    }
 
@@ -667,7 +681,7 @@ public final class StompConnection implements RemotingConnection {
       } catch (ActiveMQStompException e) {
          throw e;
       } catch (Exception e) {
-         throw BUNDLE.errorUnsubscrib(subscriptionID, e).setHandler(frameHandler);
+         throw BUNDLE.errorUnsubscribing(subscriptionID, e).setHandler(frameHandler);
       }
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c756499c/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java
index 06af785..580bade 100644
--- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java
+++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java
@@ -172,6 +172,7 @@ public abstract class VersionedStompFrameHandler {
          AddressInfo.RoutingType routingType = getRoutingType(frame.getHeader(Headers.Send.DESTINATION_TYPE), frame.getHeader(Headers.Send.DESTINATION));
          connection.autoCreateDestinationIfPossible(destination, routingType);
          connection.checkDestination(destination);
+         connection.checkRoutingSemantics(destination, routingType);
          String txID = frame.getHeader(Stomp.Headers.TRANSACTION);
 
          long timestamp = System.currentTimeMillis();
@@ -344,12 +345,15 @@ public abstract class VersionedStompFrameHandler {
    }
 
    private AddressInfo.RoutingType getRoutingType(String typeHeader, String destination) {
-      AddressInfo.RoutingType routingType = AddressInfo.RoutingType.ANYCAST; // default
+      // null is valid to return here so we know when the user didn't provide any routing info
+      AddressInfo.RoutingType routingType = null;
       if (typeHeader != null) {
          routingType = AddressInfo.RoutingType.valueOf(typeHeader);
       } else if (destination != null && !connection.getAnycastPrefix().equals(connection.getMulticastPrefix())) {
          if (connection.getMulticastPrefix().length() > 0 && destination.startsWith(connection.getMulticastPrefix())) {
             routingType = AddressInfo.RoutingType.MULTICAST;
+         } else if (connection.getAnycastPrefix().length() > 0 && destination.startsWith(connection.getAnycastPrefix())) {
+            routingType = AddressInfo.RoutingType.ANYCAST;
          }
       }
       return routingType;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c756499c/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java
index 19e9ebe..e7dcc91 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java
@@ -1372,4 +1372,38 @@ public class StompTest extends StompTestBase {
 
       conn.disconnect();
    }
+
+   @Test
+   public void testMulticastOperationsOnAnycastAddress() throws Exception {
+      testRoutingSemantics(AddressInfo.RoutingType.MULTICAST.toString(), getQueuePrefix() + getQueueName());
+   }
+
+   @Test
+   public void testAnycastOperationsOnMulticastAddress() throws Exception {
+      testRoutingSemantics(AddressInfo.RoutingType.ANYCAST.toString(), getTopicPrefix() + getTopicName());
+   }
+
+   public void testRoutingSemantics(String routingType, String destination) throws Exception {
+      conn.connect(defUser, defPass);
+
+      String uuid = UUID.randomUUID().toString();
+
+      ClientStompFrame frame = conn.createFrame(Stomp.Commands.SUBSCRIBE)
+                                   .addHeader(Stomp.Headers.Subscribe.SUBSCRIPTION_TYPE, routingType)
+                                   .addHeader(Stomp.Headers.Subscribe.DESTINATION, destination)
+                                   .addHeader(Stomp.Headers.RECEIPT_REQUESTED, uuid);
+
+      frame = conn.sendFrame(frame);
+      assertEquals(Stomp.Responses.ERROR, frame.getCommand());
+
+      uuid = UUID.randomUUID().toString();
+
+      frame = conn.createFrame(Stomp.Commands.SEND)
+                                   .addHeader(Stomp.Headers.Send.DESTINATION_TYPE, AddressInfo.RoutingType.MULTICAST.toString())
+                                   .addHeader(Stomp.Headers.Send.DESTINATION, getQueuePrefix() + getQueueName())
+                                   .addHeader(Stomp.Headers.RECEIPT_REQUESTED, uuid);
+
+      frame = conn.sendFrame(frame);
+      assertEquals(Stomp.Responses.ERROR, frame.getCommand());
+   }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c756499c/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTestBase.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTestBase.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTestBase.java
index 278d80e..bcac436 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTestBase.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTestBase.java
@@ -218,7 +218,6 @@ public abstract class StompTestBase extends ActiveMQTestBase {
       MessageProducer producer = session.createProducer(destination);
       TextMessage message = session.createTextMessage(msg);
       producer.send(message);
-      IntegrationTestLogger.LOGGER.info("Sent message from JMS client to: " + destination);
    }
 
    public void sendJmsMessage(byte[] data, Destination destination) throws Exception {
@@ -526,6 +525,8 @@ public abstract class StompTestBase extends ActiveMQTestBase {
          assertEquals(uuid, frame.getHeader(Stomp.Headers.Response.RECEIPT_ID));
       }
 
+      IntegrationTestLogger.LOGGER.info("Received: " + frame);
+
       return frame;
    }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c756499c/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/v11/StompV11Test.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/v11/StompV11Test.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/v11/StompV11Test.java
index 7cb02a3..6eb57b2 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/v11/StompV11Test.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/v11/StompV11Test.java
@@ -2128,24 +2128,6 @@ public class StompV11Test extends StompTestBase {
    }
 
    @Test
-   public void testSendMessageToNonExistentQueueWithoutAutoCreation() throws Exception {
-      AddressSettings addressSettings = new AddressSettings();
-      addressSettings.setAutoCreateJmsQueues(false);
-      server.getActiveMQServer().getAddressSettingsRepository().addMatch("#", addressSettings);
-      conn.connect(defUser, defPass);
-
-      String uuid = UUID.randomUUID().toString();
-
-      ClientStompFrame frame = send(conn, "NonExistentQueue" + uuid, null, "Hello World", true, AddressInfo.RoutingType.ANYCAST);
-
-      // TODO fix this test by checking auto-create settings
-      assertTrue(frame.getCommand().equals(Stomp.Responses.ERROR));
-      IntegrationTestLogger.LOGGER.info("message: " + frame.getHeader("message"));
-
-      conn.disconnect();
-   }
-
-   @Test
    public void testSendMessageToNonExistentQueueWithAutoCreation() throws Exception {
       conn.connect(defUser, defPass);
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c756499c/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/v12/StompV12Test.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/v12/StompV12Test.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/v12/StompV12Test.java
index 0a52714..dc8cea0 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/v12/StompV12Test.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/v12/StompV12Test.java
@@ -2167,21 +2167,6 @@ public class StompV12Test extends StompTestBase {
    }
 
    @Test
-   public void testSendMessageToNonExistentQueueWithoutAutoCreation() throws Exception {
-      AddressSettings addressSettings = new AddressSettings();
-      addressSettings.setAutoCreateJmsQueues(false);
-      server.getActiveMQServer().getAddressSettingsRepository().addMatch("#", addressSettings);
-      conn.connect(defUser, defPass);
-
-      ClientStompFrame frame = send(conn, "NonExistentQueue" + UUID.randomUUID().toString(), null, "Hello World", true, AddressInfo.RoutingType.ANYCAST);
-
-      // TODO this is broken because queue auto-creation is always on
-      assertTrue(frame.getCommand().equals(Stomp.Responses.ERROR));
-
-      waitDisconnect(conn);
-   }
-
-   @Test
    public void testSendMessageToNonExistentQueueWithAutoCreation() throws Exception {
       conn.connect(defUser, defPass);