You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2015/02/09 17:50:05 UTC

[2/3] activemq-6 git commit: ACTIVEMQ6-76 auto queue creation on STOMP send/sub

ACTIVEMQ6-76 auto queue creation on STOMP send/sub

Implements a new feature for the broker whereby it may automatically
create and delete queues which are not explicitly defined through
the management API or file-based configuration when a client sends a
message to or consumes from a queue via the STOMP protocol. Note,
the destination has to be named like "jms.queue.*" to be auto-
created. The queue may subsequently be deleted when it no longer has
any messages and consumers. Auto-creation and auto-deletion can both
be turned on/off via address-setting.


Project: http://git-wip-us.apache.org/repos/asf/activemq-6/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-6/commit/13104422
Tree: http://git-wip-us.apache.org/repos/asf/activemq-6/tree/13104422
Diff: http://git-wip-us.apache.org/repos/asf/activemq-6/diff/13104422

Branch: refs/heads/master
Commit: 13104422442c181f87325a8844cf646462faf242
Parents: 593ea2d
Author: jbertram <jb...@redhat.com>
Authored: Tue Feb 3 12:21:09 2015 -0600
Committer: jbertram <jb...@redhat.com>
Committed: Mon Feb 9 09:03:58 2015 -0600

----------------------------------------------------------------------
 .../stomp/ActiveMQStompProtocolLogger.java      |  3 +-
 .../core/protocol/stomp/StompConnection.java    | 29 ++++++
 .../protocol/stomp/StompProtocolManager.java    |  5 ++
 .../activemq/core/server/ActiveMQServer.java    |  2 +-
 .../core/server/impl/ActiveMQServerImpl.java    |  2 +-
 .../impl/AutoCreatedQueueManagerImpl.java       |  2 +-
 .../tests/integration/stomp/StompTest.java      | 95 ++++++++++++++++++++
 .../tests/integration/stomp/StompTestBase.java  |  4 +-
 .../integration/stomp/v11/StompV11Test.java     | 26 +++++-
 .../integration/stomp/v12/StompV12Test.java     | 30 ++++++-
 10 files changed, 190 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-6/blob/13104422/activemq-protocols/activemq-stomp-protocol/src/main/java/org/apache/activemq/core/protocol/stomp/ActiveMQStompProtocolLogger.java
----------------------------------------------------------------------
diff --git a/activemq-protocols/activemq-stomp-protocol/src/main/java/org/apache/activemq/core/protocol/stomp/ActiveMQStompProtocolLogger.java b/activemq-protocols/activemq-stomp-protocol/src/main/java/org/apache/activemq/core/protocol/stomp/ActiveMQStompProtocolLogger.java
index e66d1ed..236b38c 100644
--- a/activemq-protocols/activemq-stomp-protocol/src/main/java/org/apache/activemq/core/protocol/stomp/ActiveMQStompProtocolLogger.java
+++ b/activemq-protocols/activemq-stomp-protocol/src/main/java/org/apache/activemq/core/protocol/stomp/ActiveMQStompProtocolLogger.java
@@ -16,6 +16,7 @@
  */
 package org.apache.activemq.core.protocol.stomp;
 
+import org.jboss.logging.BasicLogger;
 import org.jboss.logging.Logger;
 import org.jboss.logging.annotations.Cause;
 import org.jboss.logging.annotations.LogMessage;
@@ -41,7 +42,7 @@ import org.jboss.logging.annotations.MessageLogger;
  */
 
 @MessageLogger(projectCode = "AMQ")
-public interface ActiveMQStompProtocolLogger
+public interface ActiveMQStompProtocolLogger extends BasicLogger
 {
    /**
     * The default logger.

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/13104422/activemq-protocols/activemq-stomp-protocol/src/main/java/org/apache/activemq/core/protocol/stomp/StompConnection.java
----------------------------------------------------------------------
diff --git a/activemq-protocols/activemq-stomp-protocol/src/main/java/org/apache/activemq/core/protocol/stomp/StompConnection.java b/activemq-protocols/activemq-stomp-protocol/src/main/java/org/apache/activemq/core/protocol/stomp/StompConnection.java
index 23d399d..16cf55e 100644
--- a/activemq-protocols/activemq-stomp-protocol/src/main/java/org/apache/activemq/core/protocol/stomp/StompConnection.java
+++ b/activemq-protocols/activemq-stomp-protocol/src/main/java/org/apache/activemq/core/protocol/stomp/StompConnection.java
@@ -27,7 +27,9 @@ import java.util.concurrent.CopyOnWriteArrayList;
 import org.apache.activemq.api.core.ActiveMQBuffer;
 import org.apache.activemq.api.core.ActiveMQBuffers;
 import org.apache.activemq.api.core.ActiveMQException;
+import org.apache.activemq.api.core.SimpleString;
 import org.apache.activemq.api.core.client.ActiveMQClient;
+import org.apache.activemq.api.core.management.ResourceNames;
 import org.apache.activemq.core.protocol.stomp.v10.StompFrameHandlerV10;
 import org.apache.activemq.core.protocol.stomp.v12.StompFrameHandlerV12;
 import org.apache.activemq.core.remoting.CloseListener;
@@ -245,12 +247,38 @@ public final class StompConnection implements RemotingConnection
 
    public void checkDestination(String destination) throws ActiveMQStompException
    {
+      if (autoCreateQueueIfPossible(destination))
+      {
+         return;
+      }
+
       if (!manager.destinationExists(destination))
       {
          throw BUNDLE.destinationNotExist(destination);
       }
    }
 
+   public boolean autoCreateQueueIfPossible(String queue) throws ActiveMQStompException
+   {
+      boolean autoCreated = false;
+
+      if (queue.startsWith(ResourceNames.JMS_QUEUE) && manager.getServer().getAddressSettingsRepository().getMatch(queue).isAutoCreateJmsQueues() && manager.getServer().locateQueue(new SimpleString(queue)) == null)
+      {
+         SimpleString queueName = new SimpleString(queue);
+         try
+         {
+            manager.getServer().createQueue(queueName, queueName, null, true, false, true);
+         }
+         catch (Exception e)
+         {
+            throw new ActiveMQStompException(e.getMessage(), e);
+         }
+         autoCreated = true;
+      }
+
+      return autoCreated;
+   }
+
    @Override
    public ActiveMQBuffer createBuffer(int size)
    {
@@ -689,6 +717,7 @@ public final class StompConnection implements RemotingConnection
    void subscribe(String destination, String selector, String ack,
                   String id, String durableSubscriptionName, boolean noLocal) throws ActiveMQStompException
    {
+      autoCreateQueueIfPossible(destination);
       if (noLocal)
       {
          String noLocalFilter = CONNECTION_ID_PROP + " <> '" + getID().toString() + "'";

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/13104422/activemq-protocols/activemq-stomp-protocol/src/main/java/org/apache/activemq/core/protocol/stomp/StompProtocolManager.java
----------------------------------------------------------------------
diff --git a/activemq-protocols/activemq-stomp-protocol/src/main/java/org/apache/activemq/core/protocol/stomp/StompProtocolManager.java b/activemq-protocols/activemq-stomp-protocol/src/main/java/org/apache/activemq/core/protocol/stomp/StompProtocolManager.java
index e73e9e4..6ce03db 100644
--- a/activemq-protocols/activemq-stomp-protocol/src/main/java/org/apache/activemq/core/protocol/stomp/StompProtocolManager.java
+++ b/activemq-protocols/activemq-stomp-protocol/src/main/java/org/apache/activemq/core/protocol/stomp/StompProtocolManager.java
@@ -500,4 +500,9 @@ class StompProtocolManager implements ProtocolManager, NotificationListener
             break;
       }
    }
+
+   public ActiveMQServer getServer()
+   {
+      return server;
+   }
 }

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/13104422/activemq-server/src/main/java/org/apache/activemq/core/server/ActiveMQServer.java
----------------------------------------------------------------------
diff --git a/activemq-server/src/main/java/org/apache/activemq/core/server/ActiveMQServer.java b/activemq-server/src/main/java/org/apache/activemq/core/server/ActiveMQServer.java
index 2054be1..2197690 100644
--- a/activemq-server/src/main/java/org/apache/activemq/core/server/ActiveMQServer.java
+++ b/activemq-server/src/main/java/org/apache/activemq/core/server/ActiveMQServer.java
@@ -190,7 +190,7 @@ public interface ActiveMQServer extends ActiveMQComponent
                      boolean durable,
                      boolean temporary) throws Exception;
 
-   Queue locateQueue(SimpleString queueName) throws Exception;
+   Queue locateQueue(SimpleString queueName);
 
    void destroyQueue(SimpleString queueName) throws Exception;
 

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/13104422/activemq-server/src/main/java/org/apache/activemq/core/server/impl/ActiveMQServerImpl.java
----------------------------------------------------------------------
diff --git a/activemq-server/src/main/java/org/apache/activemq/core/server/impl/ActiveMQServerImpl.java b/activemq-server/src/main/java/org/apache/activemq/core/server/impl/ActiveMQServerImpl.java
index cb91351..469ece2 100644
--- a/activemq-server/src/main/java/org/apache/activemq/core/server/impl/ActiveMQServerImpl.java
+++ b/activemq-server/src/main/java/org/apache/activemq/core/server/impl/ActiveMQServerImpl.java
@@ -1234,7 +1234,7 @@ public class ActiveMQServerImpl implements ActiveMQServer
    }
 
 
-   public Queue locateQueue(SimpleString queueName) throws Exception
+   public Queue locateQueue(SimpleString queueName)
    {
       Binding binding = postOffice.getBinding(queueName);
 

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/13104422/activemq-server/src/main/java/org/apache/activemq/core/server/impl/AutoCreatedQueueManagerImpl.java
----------------------------------------------------------------------
diff --git a/activemq-server/src/main/java/org/apache/activemq/core/server/impl/AutoCreatedQueueManagerImpl.java b/activemq-server/src/main/java/org/apache/activemq/core/server/impl/AutoCreatedQueueManagerImpl.java
index 5bcd2c6..05cc9cf 100644
--- a/activemq-server/src/main/java/org/apache/activemq/core/server/impl/AutoCreatedQueueManagerImpl.java
+++ b/activemq-server/src/main/java/org/apache/activemq/core/server/impl/AutoCreatedQueueManagerImpl.java
@@ -24,7 +24,7 @@ import org.apache.activemq.core.server.Queue;
 import org.apache.activemq.utils.ReferenceCounterUtil;
 
 /**
- * @author Clebert Suconic
+ * @author Justin Bertram
  */
 
 public class AutoCreatedQueueManagerImpl implements AutoCreatedQueueManager

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/13104422/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/stomp/StompTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/stomp/StompTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/stomp/StompTest.java
index 072b2e7..b32e1bf 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/stomp/StompTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/stomp/StompTest.java
@@ -30,8 +30,12 @@ import java.util.concurrent.TimeUnit;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
+import org.apache.activemq.api.core.SimpleString;
+import org.apache.activemq.api.core.management.ResourceNames;
+import org.apache.activemq.api.jms.ActiveMQJMSClient;
 import org.apache.activemq.core.protocol.stomp.Stomp;
 import org.apache.activemq.tests.integration.IntegrationTestLogger;
+import org.apache.activemq.tests.util.RandomUtil;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -147,6 +151,40 @@ public class StompTest extends StompTestBase
       Assert.assertTrue(Math.abs(tnow - tmsg) < 1000);
    }
 
+   @Test
+   public void testSendMessageToNonExistentQueue() throws Exception
+   {
+      String nonExistantQueue = RandomUtil.randomString();
+      String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
+      sendFrame(frame);
+
+      frame = receiveFrame(10000);
+      Assert.assertTrue(frame.startsWith("CONNECTED"));
+
+      frame = "SEND\n" + "destination:" + getQueuePrefix() + nonExistantQueue + "\n\n" + "Hello World" + Stomp.NULL;
+
+      sendFrame(frame);
+      receiveFrame(1000);
+
+      MessageConsumer consumer = session.createConsumer(ActiveMQJMSClient.createQueue(nonExistantQueue));
+      TextMessage message = (TextMessage) consumer.receive(1000);
+      Assert.assertNotNull(message);
+      Assert.assertEquals("Hello World", message.getText());
+      // Assert default priority 4 is used when priority header is not set
+      Assert.assertEquals("getJMSPriority", 4, message.getJMSPriority());
+
+      // Make sure that the timestamp is valid - should
+      // be very close to the current time.
+      long tnow = System.currentTimeMillis();
+      long tmsg = message.getJMSTimestamp();
+      Assert.assertTrue(Math.abs(tnow - tmsg) < 1500);
+
+      // closing the consumer here should trigger auto-deletion
+      assertNotNull(server.getActiveMQServer().getPostOffice().getBinding(new SimpleString(ResourceNames.JMS_QUEUE + nonExistantQueue)));
+      consumer.close();
+      assertNull(server.getActiveMQServer().getPostOffice().getBinding(new SimpleString(ResourceNames.JMS_QUEUE + nonExistantQueue)));
+   }
+
    /*
     * Some STOMP clients erroneously put a new line \n *after* the terminating NUL char at the end of the frame
     * This means next frame read might have a \n a the beginning.
@@ -1095,6 +1133,63 @@ public class StompTest extends StompTestBase
    }
 
    @Test
+   public void testSubscribeToNonExistantQueue() throws Exception
+   {
+      String nonExistantQueue = RandomUtil.randomString();
+
+      String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
+      sendFrame(frame);
+
+      frame = receiveFrame(100000);
+      Assert.assertTrue(frame.startsWith("CONNECTED"));
+
+      frame = "SUBSCRIBE\n" + "destination:" +
+         getQueuePrefix() +
+         nonExistantQueue +
+         "\n" +
+         "receipt: 12\n" +
+         "\n\n" +
+         Stomp.NULL;
+      sendFrame(frame);
+      // wait for SUBSCRIBE's receipt
+      frame = receiveFrame(10000);
+      Assert.assertTrue(frame.startsWith("RECEIPT"));
+
+      sendMessage(getName(), ActiveMQJMSClient.createQueue(nonExistantQueue));
+
+      frame = receiveFrame(10000);
+      Assert.assertTrue(frame.startsWith("MESSAGE"));
+      Assert.assertTrue(frame.indexOf("destination:") > 0);
+      Assert.assertTrue(frame.indexOf(getName()) > 0);
+
+      assertNotNull(server.getActiveMQServer().getPostOffice().getBinding(new SimpleString(ResourceNames.JMS_QUEUE + nonExistantQueue)));
+
+      frame = "UNSUBSCRIBE\n" + "destination:" +
+         getQueuePrefix() +
+         nonExistantQueue +
+         "\n" +
+         "receipt: 1234\n" +
+         "\n\n" +
+         Stomp.NULL;
+      sendFrame(frame);
+      // wait for UNSUBSCRIBE's receipt
+      frame = receiveFrame(10000);
+      Assert.assertTrue(frame.startsWith("RECEIPT"));
+
+      assertNull(server.getActiveMQServer().getPostOffice().getBinding(new SimpleString(ResourceNames.JMS_QUEUE + nonExistantQueue)));
+
+      sendMessage(getName(), ActiveMQJMSClient.createQueue(nonExistantQueue));
+
+      frame = receiveFrame(1000);
+      log.info("Received frame: " + frame);
+      Assert.assertNull("No message should have been received since subscription was removed", frame);
+
+
+      frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
+      sendFrame(frame);
+   }
+
+   @Test
    public void testDurableSubscriberWithReconnection() throws Exception
    {
 

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/13104422/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/stomp/StompTestBase.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/stomp/StompTestBase.java b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/stomp/StompTestBase.java
index 3aec083..e7c7819 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/stomp/StompTestBase.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/stomp/StompTestBase.java
@@ -114,6 +114,7 @@ public abstract class StompTestBase extends UnitTestCase
       if (autoCreateServer)
       {
          server = createServer();
+         addServer(server.getActiveMQServer());
          server.start();
          connectionFactory = createConnectionFactory();
          createBootstrap();
@@ -231,9 +232,8 @@ public abstract class StompTestBase extends UnitTestCase
          if (group != null)
          {
             channel.close();
-            group.shutdown();
+            group.shutdownGracefully(0, 5000, TimeUnit.MILLISECONDS);
          }
-         server.stop();
       }
       super.tearDown();
    }

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/13104422/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/stomp/v11/StompV11Test.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/stomp/v11/StompV11Test.java b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/stomp/v11/StompV11Test.java
index 0984353..5117952 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/stomp/v11/StompV11Test.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/stomp/v11/StompV11Test.java
@@ -30,6 +30,7 @@ import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.activemq.core.settings.impl.AddressSettings;
 import org.apache.activemq.tests.integration.IntegrationTestLogger;
 import org.apache.activemq.tests.integration.stomp.util.ClientStompFrame;
 import org.apache.activemq.tests.integration.stomp.util.StompClientConnection;
@@ -2354,8 +2355,11 @@ public class StompV11Test extends StompV11TestBase
    }
 
    @Test
-   public void testSendMessageToNonExistentJmsQueue() throws Exception
+   public void testSendMessageToNonExistentJmsQueueWithoutAutoCreation() throws Exception
    {
+      AddressSettings addressSettings = new AddressSettings();
+      addressSettings.setAutoCreateJmsQueues(false);
+      server.getActiveMQServer().getAddressSettingsRepository().addMatch("#", addressSettings);
       connV11.connect(defUser, defPass);
 
       ClientStompFrame frame = connV11.createFrame("SEND");
@@ -2374,6 +2378,26 @@ public class StompV11Test extends StompV11TestBase
    }
 
    @Test
+   public void testSendMessageToNonExistentJmsQueueWithAutoCreation() throws Exception
+   {
+      connV11.connect(defUser, defPass);
+
+      ClientStompFrame frame = connV11.createFrame("SEND");
+      String guid = UUID.randomUUID().toString();
+      frame.addHeader("destination", "jms.queue.NonExistentQueue" + guid);
+      frame.addHeader("receipt", "1234");
+      frame.setBody("Hello World");
+
+      frame = connV11.sendFrame(frame);
+
+      assertTrue(frame.getCommand().equals("RECEIPT"));
+      assertEquals("1234", frame.getHeader("receipt-id"));
+      System.out.println("message: " + frame.getHeader("message"));
+
+      connV11.disconnect();
+   }
+
+   @Test
    public void testSendAndReceiveWithEscapedCharactersInSenderId() throws Exception
    {
       connV11.connect(defUser, defPass);

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/13104422/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/stomp/v12/StompV12Test.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/stomp/v12/StompV12Test.java b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/stomp/v12/StompV12Test.java
index 3b6e471..7a308af 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/stomp/v12/StompV12Test.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/stomp/v12/StompV12Test.java
@@ -31,6 +31,7 @@ import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.activemq.core.settings.impl.AddressSettings;
 import org.apache.activemq.tests.integration.IntegrationTestLogger;
 import org.apache.activemq.tests.integration.stomp.util.ClientStompFrame;
 import org.apache.activemq.tests.integration.stomp.util.StompClientConnection;
@@ -400,6 +401,10 @@ public class StompV12Test extends StompV11TestBase
    @Test
    public void testHeaderRepetitive() throws Exception
    {
+      AddressSettings addressSettings = new AddressSettings();
+      addressSettings.setAutoCreateJmsQueues(false);
+      server.getActiveMQServer().getAddressSettingsRepository().addMatch("#", addressSettings);
+
       connV12.connect(defUser, defPass);
       ClientStompFrame frame = connV12.createFrame("SEND");
 
@@ -2617,8 +2622,11 @@ public class StompV12Test extends StompV11TestBase
    }
 
    @Test
-   public void testSendMessageToNonExistentJmsQueue() throws Exception
+   public void testSendMessageToNonExistentJmsQueueWithoutAutoCreation() throws Exception
    {
+      AddressSettings addressSettings = new AddressSettings();
+      addressSettings.setAutoCreateJmsQueues(false);
+      server.getActiveMQServer().getAddressSettingsRepository().addMatch("#", addressSettings);
       connV12.connect(defUser, defPass);
 
       ClientStompFrame frame = connV12.createFrame("SEND");
@@ -2637,6 +2645,26 @@ public class StompV12Test extends StompV11TestBase
    }
 
    @Test
+   public void testSendMessageToNonExistentJmsQueueWithAutoCreation() throws Exception
+   {
+      connV12.connect(defUser, defPass);
+
+      ClientStompFrame frame = connV12.createFrame("SEND");
+      String guid = UUID.randomUUID().toString();
+      frame.addHeader("destination", "jms.queue.NonExistentQueue" + guid);
+      frame.addHeader("receipt", "1234");
+      frame.setBody("Hello World");
+
+      frame = connV12.sendFrame(frame);
+
+      assertTrue(frame.getCommand().equals("RECEIPT"));
+      assertEquals("1234", frame.getHeader("receipt-id"));
+      System.out.println("message: " + frame.getHeader("message"));
+
+      connV12.disconnect();
+   }
+
+   @Test
    public void testInvalidStompCommand() throws Exception
    {
       try