You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2018/04/27 01:20:12 UTC

[1/2] activemq-artemis git commit: This closes #2042

Repository: activemq-artemis
Updated Branches:
  refs/heads/master e937c9903 -> d18e8439b


This closes #2042


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/d18e8439
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/d18e8439
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/d18e8439

Branch: refs/heads/master
Commit: d18e8439b5c896125806a557f163548722e32aff
Parents: e937c99 d773e8f
Author: Clebert Suconic <cl...@apache.org>
Authored: Thu Apr 26 21:20:07 2018 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Thu Apr 26 21:20:07 2018 -0400

----------------------------------------------------------------------
 .../core/protocol/stomp/StompConnection.java    |  38 +++--
 .../stomp/VersionedStompFrameHandler.java       |  12 +-
 .../stomp/v10/StompFrameHandlerV10.java         |  46 +++---
 .../tests/integration/stomp/StompTest.java      | 155 +++++++++++++++++++
 4 files changed, 219 insertions(+), 32 deletions(-)
----------------------------------------------------------------------



[2/2] activemq-artemis git commit: ARTEMIS-1794 STOMP clients using same addr w/diff routing types

Posted by cl...@apache.org.
ARTEMIS-1794 STOMP clients using same addr w/diff routing types


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/d773e8f6
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/d773e8f6
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/d773e8f6

Branch: refs/heads/master
Commit: d773e8f66b5db2b0fcaa646384c24690d38aaa69
Parents: e937c99
Author: Justin Bertram <jb...@apache.org>
Authored: Tue Apr 10 10:41:15 2018 -0500
Committer: Clebert Suconic <cl...@apache.org>
Committed: Thu Apr 26 21:20:07 2018 -0400

----------------------------------------------------------------------
 .../core/protocol/stomp/StompConnection.java    |  38 +++--
 .../stomp/VersionedStompFrameHandler.java       |  12 +-
 .../stomp/v10/StompFrameHandlerV10.java         |  46 +++---
 .../tests/integration/stomp/StompTest.java      | 155 +++++++++++++++++++
 4 files changed, 219 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/d773e8f6/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java
index 32e64b3..fbd0107 100644
--- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java
+++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompConnection.java
@@ -18,6 +18,7 @@ package org.apache.activemq.artemis.core.protocol.stomp;
 
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.EnumSet;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
@@ -262,7 +263,7 @@ public final class StompConnection implements RemotingConnection {
 
    // TODO this should take a type - send or receive so it knows whether to check the address or the queue
    public void checkDestination(String destination) throws ActiveMQStompException {
-      if (!manager.destinationExists(getSession().getCoreSession().removePrefix(SimpleString.toSimpleString(destination)).toString())) {
+      if (!manager.destinationExists(destination)) {
          throw BUNDLE.destinationNotExist(destination).setHandler(frameHandler);
       }
    }
@@ -272,28 +273,47 @@ public final class StompConnection implements RemotingConnection {
 
       try {
          SimpleString simpleQueue = SimpleString.toSimpleString(queue);
-         if (manager.getServer().getAddressInfo(simpleQueue) == null) {
-            AddressSettings addressSettings = manager.getServer().getAddressSettingsRepository().getMatch(queue);
-
-            RoutingType effectiveAddressRoutingType = routingType == null ? addressSettings.getDefaultAddressRoutingType() : routingType;
+         AddressInfo addressInfo = manager.getServer().getAddressInfo(simpleQueue);
+         AddressSettings addressSettings = manager.getServer().getAddressSettingsRepository().getMatch(queue);
+         RoutingType effectiveAddressRoutingType = routingType == null ? addressSettings.getDefaultAddressRoutingType() : routingType;
+         boolean checkAnycast = false;
+         /**
+          * If the address doesn't exist then it is created if possible.
+          * If the address does exist but doesn't support the routing-type then the address is updated if possible.
+          */
+         if (addressInfo == null) {
             if (addressSettings.isAutoCreateAddresses()) {
                session.createAddress(simpleQueue, effectiveAddressRoutingType, true);
             }
 
-            // only auto create the queue if the address is ANYCAST
-            if (effectiveAddressRoutingType == RoutingType.ANYCAST && addressSettings.isAutoCreateQueues()) {
-               session.createQueue(simpleQueue, simpleQueue, routingType == null ? addressSettings.getDefaultQueueRoutingType() : routingType, null, false, true, true);
+            checkAnycast = true;
+         } else if (!addressInfo.getRoutingTypes().contains(effectiveAddressRoutingType)) {
+            if (addressSettings.isAutoCreateAddresses()) {
+               EnumSet<RoutingType> routingTypes = EnumSet.noneOf(RoutingType.class);
+               for (RoutingType existingRoutingType : addressInfo.getRoutingTypes()) {
+                  routingTypes.add(existingRoutingType);
+               }
+               routingTypes.add(effectiveAddressRoutingType);
+               manager.getServer().updateAddressInfo(simpleQueue, routingTypes);
             }
+
+            checkAnycast = true;
+         }
+
+         // only auto create the queue if the address is ANYCAST
+         if (checkAnycast && effectiveAddressRoutingType == RoutingType.ANYCAST && addressSettings.isAutoCreateQueues()) {
+            session.createQueue(simpleQueue, simpleQueue, routingType == null ? addressSettings.getDefaultQueueRoutingType() : routingType, null, false, true, true);
          }
       } catch (ActiveMQQueueExistsException e) {
          // ignore
       } catch (Exception e) {
+         ActiveMQStompProtocolLogger.LOGGER.debug("Exception while auto-creating destination", e);
          throw new ActiveMQStompException(e.getMessage(), e).setHandler(frameHandler);
       }
    }
 
    public void checkRoutingSemantics(String destination, RoutingType routingType) throws ActiveMQStompException {
-      AddressInfo addressInfo = manager.getServer().getAddressInfo(getSession().getCoreSession().removePrefix(SimpleString.toSimpleString(destination)));
+      AddressInfo addressInfo = manager.getServer().getAddressInfo(SimpleString.toSimpleString(destination));
 
       // may be null here if, for example, the management address is being checked
       if (addressInfo != null) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/d773e8f6/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java
index 3cb5ab8..941cee6 100644
--- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java
+++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/VersionedStompFrameHandler.java
@@ -279,8 +279,16 @@ public abstract class VersionedStompFrameHandler {
       return connection.subscribe(destination, selector, ack, id, durableSubscriptionName, noLocal, routingType);
    }
 
-   public String getDestination(StompFrame request) {
-      return request.getHeader(Headers.Subscribe.DESTINATION);
+   public String getDestination(StompFrame request) throws ActiveMQStompException {
+      return getDestination(request, Headers.Subscribe.DESTINATION);
+   }
+
+   public String getDestination(StompFrame request, String header) throws ActiveMQStompException {
+      String destination = request.getHeader(header);
+      if (destination == null) {
+         return null;
+      }
+      return connection.getSession().getCoreSession().removePrefix(SimpleString.toSimpleString(destination)).toString();
    }
 
    public StompFrame postprocess(StompFrame request) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/d773e8f6/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/v10/StompFrameHandlerV10.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/v10/StompFrameHandlerV10.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/v10/StompFrameHandlerV10.java
index a6785b7..2011e05 100644
--- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/v10/StompFrameHandlerV10.java
+++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/v10/StompFrameHandlerV10.java
@@ -94,32 +94,36 @@ public class StompFrameHandlerV10 extends VersionedStompFrameHandler implements
    @Override
    public StompFrame onUnsubscribe(StompFrame request) {
       StompFrame response = null;
-      String destination = request.getHeader(Stomp.Headers.Unsubscribe.DESTINATION);
-      String id = request.getHeader(Stomp.Headers.Unsubscribe.ID);
-      String durableSubscriptionName = request.getHeader(Stomp.Headers.Unsubscribe.DURABLE_SUBSCRIBER_NAME);
-      if (durableSubscriptionName == null) {
-         durableSubscriptionName = request.getHeader(Stomp.Headers.Unsubscribe.DURABLE_SUBSCRIPTION_NAME);
-      }
-      if (durableSubscriptionName == null) {
-         durableSubscriptionName = request.getHeader(Stomp.Headers.Unsubscribe.ACTIVEMQ_DURABLE_SUBSCRIPTION_NAME);
-      }
+      try {
+         String destination = getDestination(request, Stomp.Headers.Unsubscribe.DESTINATION);
+         String id = request.getHeader(Stomp.Headers.Unsubscribe.ID);
+         String durableSubscriptionName = request.getHeader(Stomp.Headers.Unsubscribe.DURABLE_SUBSCRIBER_NAME);
+         if (durableSubscriptionName == null) {
+            durableSubscriptionName = request.getHeader(Stomp.Headers.Unsubscribe.DURABLE_SUBSCRIPTION_NAME);
+         }
+         if (durableSubscriptionName == null) {
+            durableSubscriptionName = request.getHeader(Stomp.Headers.Unsubscribe.ACTIVEMQ_DURABLE_SUBSCRIPTION_NAME);
+         }
 
-      String subscriptionID = null;
-      if (id != null) {
-         subscriptionID = id;
-      } else {
-         if (destination == null) {
-            ActiveMQStompException error = BUNDLE.needIDorDestination().setHandler(this);
-            response = error.getFrame();
-            return response;
+         String subscriptionID = null;
+         if (id != null) {
+            subscriptionID = id;
+         } else {
+            if (destination == null) {
+               ActiveMQStompException error = BUNDLE.needIDorDestination().setHandler(this);
+               response = error.getFrame();
+               return response;
+            }
+            subscriptionID = "subscription/" + destination;
          }
-         subscriptionID = "subscription/" + destination;
-      }
 
-      try {
          connection.unsubscribe(subscriptionID, durableSubscriptionName);
+
       } catch (ActiveMQStompException e) {
-         return e.getFrame();
+         response = e.getFrame();
+      } catch (Exception e) {
+         ActiveMQStompException error = BUNDLE.errorHandleSend(e).setHandler(this);
+         response = error.getFrame();
       }
       return response;
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/d773e8f6/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java
index cd18d0c..de6a11d 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java
@@ -1573,6 +1573,159 @@ public class StompTest extends StompTestBase {
       conn.disconnect();
    }
 
+   /**
+    * This test and testPrefixedAutoCreatedMulticastAndAnycastWithSameName are basically the same but doing the
+    * operations in opposite order. In this test the anycast subscription is created first.
+    * @throws Exception
+    */
+   @Test
+   public void testPrefixedAutoCreatedAnycastAndMulticastWithSameName() throws Exception {
+      int port = 61614;
+
+      URI uri = createStompClientUri(scheme, hostname, port);
+
+      final String ADDRESS = UUID.randomUUID().toString();
+      server.getActiveMQServer().getRemotingService().createAcceptor("test", "tcp://" + hostname + ":" + port + "?protocols=STOMP&anycastPrefix=/queue/&multicastPrefix=/topic/").start();
+      StompClientConnection conn = StompClientConnectionFactory.createClientConnection(uri);
+      conn.connect(defUser, defPass);
+
+      // since this queue doesn't exist the broker should create a new ANYCAST address & queue
+      String uuid = UUID.randomUUID().toString();
+      ClientStompFrame frame = conn.createFrame(Stomp.Commands.SUBSCRIBE)
+                                   .addHeader(Stomp.Headers.Subscribe.DESTINATION, "/queue/" + ADDRESS)
+                                   .addHeader(Stomp.Headers.RECEIPT_REQUESTED, uuid);
+      frame = conn.sendFrame(frame);
+      assertEquals(uuid, frame.getHeader(Stomp.Headers.Response.RECEIPT_ID));
+
+      AddressInfo addressInfo = server.getActiveMQServer().getAddressInfo(SimpleString.toSimpleString(ADDRESS));
+      assertNotNull("No address was created with the name " + ADDRESS, addressInfo);
+      assertTrue(addressInfo.getRoutingTypes().contains(RoutingType.ANYCAST));
+      assertFalse(addressInfo.getRoutingTypes().contains(RoutingType.MULTICAST));
+      assertNotNull(server.getActiveMQServer().locateQueue(SimpleString.toSimpleString(ADDRESS)));
+
+      // sending a MULTICAST message should alter the address to support MULTICAST
+      frame = send(conn, "/topic/" + ADDRESS, null, "Hello World 1", true);
+      assertFalse(frame.getCommand().equals("ERROR"));
+      addressInfo = server.getActiveMQServer().getAddressInfo(SimpleString.toSimpleString(ADDRESS));
+      assertTrue(addressInfo.getRoutingTypes().contains(RoutingType.ANYCAST));
+      assertTrue(addressInfo.getRoutingTypes().contains(RoutingType.MULTICAST));
+
+      // however, no message should be routed to the ANYCAST queue
+      frame = conn.receiveFrame(1000);
+      Assert.assertNull(frame);
+
+      // sending a message to the ANYCAST queue, should be received
+      frame = send(conn, "/queue/" + ADDRESS, null, "Hello World 2", true);
+      assertFalse(frame.getCommand().equals("ERROR"));
+      frame = conn.receiveFrame(1000);
+      Assert.assertEquals(Stomp.Responses.MESSAGE, frame.getCommand());
+      Assert.assertEquals("Hello World 2", frame.getBody());
+      Assert.assertEquals(RoutingType.ANYCAST.toString(), frame.getHeader(Stomp.Headers.Send.DESTINATION_TYPE));
+      frame = conn.receiveFrame(1000);
+      Assert.assertNull(frame);
+
+      unsubscribe(conn, null, "/queue/" + ADDRESS, true, false);
+
+      // now subscribe to the address in a MULTICAST way which will create a MULTICAST queue for the subscription
+      uuid = UUID.randomUUID().toString();
+      frame = conn.createFrame(Stomp.Commands.SUBSCRIBE)
+                  .addHeader(Stomp.Headers.Subscribe.DESTINATION, "/topic/" + ADDRESS)
+                  .addHeader(Stomp.Headers.RECEIPT_REQUESTED, uuid);
+      frame = conn.sendFrame(frame);
+      assertEquals(uuid, frame.getHeader(Stomp.Headers.Response.RECEIPT_ID));
+
+      // send a message which will be routed to the MULTICAST queue
+      frame = send(conn, "/topic/" + ADDRESS, null, "Hello World 3", true);
+      assertFalse(frame.getCommand().equals("ERROR"));
+
+      // receive that message on the topic subscription
+      frame = conn.receiveFrame(1000);
+      Assert.assertEquals(Stomp.Responses.MESSAGE, frame.getCommand());
+      Assert.assertEquals("Hello World 3", frame.getBody());
+      Assert.assertEquals(RoutingType.MULTICAST.toString(), frame.getHeader(Stomp.Headers.Send.DESTINATION_TYPE));
+      frame = conn.receiveFrame(1000);
+      Assert.assertNull(frame);
+
+      unsubscribe(conn, null, "/topic/" + ADDRESS, true, false);
+
+      conn.disconnect();
+   }
+
+   /**
+    * This test and testPrefixedAutoCreatedMulticastAndAnycastWithSameName are basically the same but doing the
+    * operations in opposite order. In this test the multicast subscription is created first.
+    * @throws Exception
+    */
+   @Test
+   public void testPrefixedAutoCreatedMulticastAndAnycastWithSameName() throws Exception {
+      int port = 61614;
+
+      URI uri = createStompClientUri(scheme, hostname, port);
+
+      final String ADDRESS = UUID.randomUUID().toString();
+      server.getActiveMQServer().getRemotingService().createAcceptor("test", "tcp://" + hostname + ":" + port + "?protocols=STOMP&anycastPrefix=/queue/&multicastPrefix=/topic/").start();
+      StompClientConnection conn = StompClientConnectionFactory.createClientConnection(uri);
+      conn.connect(defUser, defPass);
+
+      // since this queue doesn't exist the broker should create a new MULTICAST address
+      String uuid = UUID.randomUUID().toString();
+      ClientStompFrame frame = conn.createFrame(Stomp.Commands.SUBSCRIBE)
+                                   .addHeader(Stomp.Headers.Subscribe.DESTINATION, "/topic/" + ADDRESS)
+                                   .addHeader(Stomp.Headers.RECEIPT_REQUESTED, uuid);
+      frame = conn.sendFrame(frame);
+      assertEquals(uuid, frame.getHeader(Stomp.Headers.Response.RECEIPT_ID));
+
+      AddressInfo addressInfo = server.getActiveMQServer().getAddressInfo(SimpleString.toSimpleString(ADDRESS));
+      assertNotNull("No address was created with the name " + ADDRESS, addressInfo);
+      assertTrue(addressInfo.getRoutingTypes().contains(RoutingType.MULTICAST));
+      assertFalse(addressInfo.getRoutingTypes().contains(RoutingType.ANYCAST));
+
+      // sending an ANYCAST message should alter the address to support ANYCAST and create an ANYCAST queue
+      frame = send(conn, "/queue/" + ADDRESS, null, "Hello World 1", true);
+      assertFalse(frame.getCommand().equals("ERROR"));
+      addressInfo = server.getActiveMQServer().getAddressInfo(SimpleString.toSimpleString(ADDRESS));
+      assertTrue(addressInfo.getRoutingTypes().contains(RoutingType.ANYCAST));
+      assertTrue(addressInfo.getRoutingTypes().contains(RoutingType.MULTICAST));
+      assertNotNull(server.getActiveMQServer().locateQueue(SimpleString.toSimpleString(ADDRESS)));
+
+      // however, no message should be routed to the MULTICAST queue
+      frame = conn.receiveFrame(1000);
+      Assert.assertNull(frame);
+
+      // sending a message to the MULTICAST queue, should be received
+      frame = send(conn, "/topic/" + ADDRESS, null, "Hello World 2", true);
+      assertFalse(frame.getCommand().equals("ERROR"));
+      frame = conn.receiveFrame(2000);
+      Assert.assertEquals(Stomp.Responses.MESSAGE, frame.getCommand());
+      Assert.assertEquals("Hello World 2", frame.getBody());
+      Assert.assertEquals(RoutingType.MULTICAST.toString(), frame.getHeader(Stomp.Headers.Send.DESTINATION_TYPE));
+      frame = conn.receiveFrame(1000);
+      Assert.assertNull(frame);
+
+      frame = unsubscribe(conn, null, "/topic/" + ADDRESS, true, false);
+      assertFalse(frame.getCommand().equals("ERROR"));
+
+      // now subscribe to the address in an ANYCAST way
+      uuid = UUID.randomUUID().toString();
+      frame = conn.createFrame(Stomp.Commands.SUBSCRIBE)
+                  .addHeader(Stomp.Headers.Subscribe.DESTINATION, "/queue/" + ADDRESS)
+                  .addHeader(Stomp.Headers.RECEIPT_REQUESTED, uuid);
+      frame = conn.sendFrame(frame);
+      assertEquals(uuid, frame.getHeader(Stomp.Headers.Response.RECEIPT_ID));
+
+      // receive that message on the ANYCAST queue
+      frame = conn.receiveFrame(1000);
+      Assert.assertEquals(Stomp.Responses.MESSAGE, frame.getCommand());
+      Assert.assertEquals("Hello World 1", frame.getBody());
+      Assert.assertEquals(RoutingType.ANYCAST.toString(), frame.getHeader(Stomp.Headers.Send.DESTINATION_TYPE));
+      frame = conn.receiveFrame(2000);
+      Assert.assertNull(frame);
+
+      unsubscribe(conn, null, "/queue/" + ADDRESS, true, false);
+
+      conn.disconnect();
+   }
+
    @Test
    public void testDotPrefixedSendAndRecieveAnycast() throws Exception {
       testPrefixedSendAndRecieve("jms.queue.", RoutingType.ANYCAST);
@@ -1626,11 +1779,13 @@ public class StompTest extends StompTestBase {
 
    @Test
    public void testMulticastOperationsOnAnycastAddress() throws Exception {
+      server.getActiveMQServer().getAddressSettingsRepository().addMatch("#", new AddressSettings().setAutoCreateAddresses(false).setAutoCreateQueues(false));
       testRoutingSemantics(RoutingType.MULTICAST.toString(), getQueuePrefix() + getQueueName());
    }
 
    @Test
    public void testAnycastOperationsOnMulticastAddress() throws Exception {
+      server.getActiveMQServer().getAddressSettingsRepository().addMatch("#", new AddressSettings().setAutoCreateAddresses(false).setAutoCreateQueues(false));
       testRoutingSemantics(RoutingType.ANYCAST.toString(), getTopicPrefix() + getTopicName());
    }