You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2015/02/09 17:50:05 UTC
[2/3] activemq-6 git commit: ACTIVEMQ6-76 auto queue creation on
STOMP send/sub
ACTIVEMQ6-76 auto queue creation on STOMP send/sub
Implements a new feature for the broker whereby it may automatically
create and delete queues which are not explicitly defined through
the management API or file-based configuration when a client sends a
message to or consumes from a queue via the STOMP protocol. Note,
the destination has to be named like "jms.queue.*" to be auto-
created. The queue may subsequently be deleted when it no longer has
any messages and consumers. Auto-creation and auto-deletion can both
be turned on/off via address-setting.
Project: http://git-wip-us.apache.org/repos/asf/activemq-6/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-6/commit/13104422
Tree: http://git-wip-us.apache.org/repos/asf/activemq-6/tree/13104422
Diff: http://git-wip-us.apache.org/repos/asf/activemq-6/diff/13104422
Branch: refs/heads/master
Commit: 13104422442c181f87325a8844cf646462faf242
Parents: 593ea2d
Author: jbertram <jb...@redhat.com>
Authored: Tue Feb 3 12:21:09 2015 -0600
Committer: jbertram <jb...@redhat.com>
Committed: Mon Feb 9 09:03:58 2015 -0600
----------------------------------------------------------------------
.../stomp/ActiveMQStompProtocolLogger.java | 3 +-
.../core/protocol/stomp/StompConnection.java | 29 ++++++
.../protocol/stomp/StompProtocolManager.java | 5 ++
.../activemq/core/server/ActiveMQServer.java | 2 +-
.../core/server/impl/ActiveMQServerImpl.java | 2 +-
.../impl/AutoCreatedQueueManagerImpl.java | 2 +-
.../tests/integration/stomp/StompTest.java | 95 ++++++++++++++++++++
.../tests/integration/stomp/StompTestBase.java | 4 +-
.../integration/stomp/v11/StompV11Test.java | 26 +++++-
.../integration/stomp/v12/StompV12Test.java | 30 ++++++-
10 files changed, 190 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/13104422/activemq-protocols/activemq-stomp-protocol/src/main/java/org/apache/activemq/core/protocol/stomp/ActiveMQStompProtocolLogger.java
----------------------------------------------------------------------
diff --git a/activemq-protocols/activemq-stomp-protocol/src/main/java/org/apache/activemq/core/protocol/stomp/ActiveMQStompProtocolLogger.java b/activemq-protocols/activemq-stomp-protocol/src/main/java/org/apache/activemq/core/protocol/stomp/ActiveMQStompProtocolLogger.java
index e66d1ed..236b38c 100644
--- a/activemq-protocols/activemq-stomp-protocol/src/main/java/org/apache/activemq/core/protocol/stomp/ActiveMQStompProtocolLogger.java
+++ b/activemq-protocols/activemq-stomp-protocol/src/main/java/org/apache/activemq/core/protocol/stomp/ActiveMQStompProtocolLogger.java
@@ -16,6 +16,7 @@
*/
package org.apache.activemq.core.protocol.stomp;
+import org.jboss.logging.BasicLogger;
import org.jboss.logging.Logger;
import org.jboss.logging.annotations.Cause;
import org.jboss.logging.annotations.LogMessage;
@@ -41,7 +42,7 @@ import org.jboss.logging.annotations.MessageLogger;
*/
@MessageLogger(projectCode = "AMQ")
-public interface ActiveMQStompProtocolLogger
+public interface ActiveMQStompProtocolLogger extends BasicLogger
{
/**
* The default logger.
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/13104422/activemq-protocols/activemq-stomp-protocol/src/main/java/org/apache/activemq/core/protocol/stomp/StompConnection.java
----------------------------------------------------------------------
diff --git a/activemq-protocols/activemq-stomp-protocol/src/main/java/org/apache/activemq/core/protocol/stomp/StompConnection.java b/activemq-protocols/activemq-stomp-protocol/src/main/java/org/apache/activemq/core/protocol/stomp/StompConnection.java
index 23d399d..16cf55e 100644
--- a/activemq-protocols/activemq-stomp-protocol/src/main/java/org/apache/activemq/core/protocol/stomp/StompConnection.java
+++ b/activemq-protocols/activemq-stomp-protocol/src/main/java/org/apache/activemq/core/protocol/stomp/StompConnection.java
@@ -27,7 +27,9 @@ import java.util.concurrent.CopyOnWriteArrayList;
import org.apache.activemq.api.core.ActiveMQBuffer;
import org.apache.activemq.api.core.ActiveMQBuffers;
import org.apache.activemq.api.core.ActiveMQException;
+import org.apache.activemq.api.core.SimpleString;
import org.apache.activemq.api.core.client.ActiveMQClient;
+import org.apache.activemq.api.core.management.ResourceNames;
import org.apache.activemq.core.protocol.stomp.v10.StompFrameHandlerV10;
import org.apache.activemq.core.protocol.stomp.v12.StompFrameHandlerV12;
import org.apache.activemq.core.remoting.CloseListener;
@@ -245,12 +247,38 @@ public final class StompConnection implements RemotingConnection
public void checkDestination(String destination) throws ActiveMQStompException
{
+ if (autoCreateQueueIfPossible(destination))
+ {
+ return;
+ }
+
if (!manager.destinationExists(destination))
{
throw BUNDLE.destinationNotExist(destination);
}
}
+ public boolean autoCreateQueueIfPossible(String queue) throws ActiveMQStompException
+ {
+ boolean autoCreated = false;
+
+ if (queue.startsWith(ResourceNames.JMS_QUEUE) && manager.getServer().getAddressSettingsRepository().getMatch(queue).isAutoCreateJmsQueues() && manager.getServer().locateQueue(new SimpleString(queue)) == null)
+ {
+ SimpleString queueName = new SimpleString(queue);
+ try
+ {
+ manager.getServer().createQueue(queueName, queueName, null, true, false, true);
+ }
+ catch (Exception e)
+ {
+ throw new ActiveMQStompException(e.getMessage(), e);
+ }
+ autoCreated = true;
+ }
+
+ return autoCreated;
+ }
+
@Override
public ActiveMQBuffer createBuffer(int size)
{
@@ -689,6 +717,7 @@ public final class StompConnection implements RemotingConnection
void subscribe(String destination, String selector, String ack,
String id, String durableSubscriptionName, boolean noLocal) throws ActiveMQStompException
{
+ autoCreateQueueIfPossible(destination);
if (noLocal)
{
String noLocalFilter = CONNECTION_ID_PROP + " <> '" + getID().toString() + "'";
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/13104422/activemq-protocols/activemq-stomp-protocol/src/main/java/org/apache/activemq/core/protocol/stomp/StompProtocolManager.java
----------------------------------------------------------------------
diff --git a/activemq-protocols/activemq-stomp-protocol/src/main/java/org/apache/activemq/core/protocol/stomp/StompProtocolManager.java b/activemq-protocols/activemq-stomp-protocol/src/main/java/org/apache/activemq/core/protocol/stomp/StompProtocolManager.java
index e73e9e4..6ce03db 100644
--- a/activemq-protocols/activemq-stomp-protocol/src/main/java/org/apache/activemq/core/protocol/stomp/StompProtocolManager.java
+++ b/activemq-protocols/activemq-stomp-protocol/src/main/java/org/apache/activemq/core/protocol/stomp/StompProtocolManager.java
@@ -500,4 +500,9 @@ class StompProtocolManager implements ProtocolManager, NotificationListener
break;
}
}
+
+ public ActiveMQServer getServer()
+ {
+ return server;
+ }
}
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/13104422/activemq-server/src/main/java/org/apache/activemq/core/server/ActiveMQServer.java
----------------------------------------------------------------------
diff --git a/activemq-server/src/main/java/org/apache/activemq/core/server/ActiveMQServer.java b/activemq-server/src/main/java/org/apache/activemq/core/server/ActiveMQServer.java
index 2054be1..2197690 100644
--- a/activemq-server/src/main/java/org/apache/activemq/core/server/ActiveMQServer.java
+++ b/activemq-server/src/main/java/org/apache/activemq/core/server/ActiveMQServer.java
@@ -190,7 +190,7 @@ public interface ActiveMQServer extends ActiveMQComponent
boolean durable,
boolean temporary) throws Exception;
- Queue locateQueue(SimpleString queueName) throws Exception;
+ Queue locateQueue(SimpleString queueName);
void destroyQueue(SimpleString queueName) throws Exception;
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/13104422/activemq-server/src/main/java/org/apache/activemq/core/server/impl/ActiveMQServerImpl.java
----------------------------------------------------------------------
diff --git a/activemq-server/src/main/java/org/apache/activemq/core/server/impl/ActiveMQServerImpl.java b/activemq-server/src/main/java/org/apache/activemq/core/server/impl/ActiveMQServerImpl.java
index cb91351..469ece2 100644
--- a/activemq-server/src/main/java/org/apache/activemq/core/server/impl/ActiveMQServerImpl.java
+++ b/activemq-server/src/main/java/org/apache/activemq/core/server/impl/ActiveMQServerImpl.java
@@ -1234,7 +1234,7 @@ public class ActiveMQServerImpl implements ActiveMQServer
}
- public Queue locateQueue(SimpleString queueName) throws Exception
+ public Queue locateQueue(SimpleString queueName)
{
Binding binding = postOffice.getBinding(queueName);
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/13104422/activemq-server/src/main/java/org/apache/activemq/core/server/impl/AutoCreatedQueueManagerImpl.java
----------------------------------------------------------------------
diff --git a/activemq-server/src/main/java/org/apache/activemq/core/server/impl/AutoCreatedQueueManagerImpl.java b/activemq-server/src/main/java/org/apache/activemq/core/server/impl/AutoCreatedQueueManagerImpl.java
index 5bcd2c6..05cc9cf 100644
--- a/activemq-server/src/main/java/org/apache/activemq/core/server/impl/AutoCreatedQueueManagerImpl.java
+++ b/activemq-server/src/main/java/org/apache/activemq/core/server/impl/AutoCreatedQueueManagerImpl.java
@@ -24,7 +24,7 @@ import org.apache.activemq.core.server.Queue;
import org.apache.activemq.utils.ReferenceCounterUtil;
/**
- * @author Clebert Suconic
+ * @author Justin Bertram
*/
public class AutoCreatedQueueManagerImpl implements AutoCreatedQueueManager
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/13104422/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/stomp/StompTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/stomp/StompTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/stomp/StompTest.java
index 072b2e7..b32e1bf 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/stomp/StompTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/stomp/StompTest.java
@@ -30,8 +30,12 @@ import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
+import org.apache.activemq.api.core.SimpleString;
+import org.apache.activemq.api.core.management.ResourceNames;
+import org.apache.activemq.api.jms.ActiveMQJMSClient;
import org.apache.activemq.core.protocol.stomp.Stomp;
import org.apache.activemq.tests.integration.IntegrationTestLogger;
+import org.apache.activemq.tests.util.RandomUtil;
import org.junit.Assert;
import org.junit.Test;
@@ -147,6 +151,40 @@ public class StompTest extends StompTestBase
Assert.assertTrue(Math.abs(tnow - tmsg) < 1000);
}
+ @Test
+ public void testSendMessageToNonExistentQueue() throws Exception
+ {
+ String nonExistantQueue = RandomUtil.randomString();
+ String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
+ sendFrame(frame);
+
+ frame = receiveFrame(10000);
+ Assert.assertTrue(frame.startsWith("CONNECTED"));
+
+ frame = "SEND\n" + "destination:" + getQueuePrefix() + nonExistantQueue + "\n\n" + "Hello World" + Stomp.NULL;
+
+ sendFrame(frame);
+ receiveFrame(1000);
+
+ MessageConsumer consumer = session.createConsumer(ActiveMQJMSClient.createQueue(nonExistantQueue));
+ TextMessage message = (TextMessage) consumer.receive(1000);
+ Assert.assertNotNull(message);
+ Assert.assertEquals("Hello World", message.getText());
+ // Assert default priority 4 is used when priority header is not set
+ Assert.assertEquals("getJMSPriority", 4, message.getJMSPriority());
+
+ // Make sure that the timestamp is valid - should
+ // be very close to the current time.
+ long tnow = System.currentTimeMillis();
+ long tmsg = message.getJMSTimestamp();
+ Assert.assertTrue(Math.abs(tnow - tmsg) < 1500);
+
+ // closing the consumer here should trigger auto-deletion
+ assertNotNull(server.getActiveMQServer().getPostOffice().getBinding(new SimpleString(ResourceNames.JMS_QUEUE + nonExistantQueue)));
+ consumer.close();
+ assertNull(server.getActiveMQServer().getPostOffice().getBinding(new SimpleString(ResourceNames.JMS_QUEUE + nonExistantQueue)));
+ }
+
/*
* Some STOMP clients erroneously put a new line \n *after* the terminating NUL char at the end of the frame
* This means next frame read might have a \n a the beginning.
@@ -1095,6 +1133,63 @@ public class StompTest extends StompTestBase
}
@Test
+ public void testSubscribeToNonExistantQueue() throws Exception
+ {
+ String nonExistantQueue = RandomUtil.randomString();
+
+ String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
+ sendFrame(frame);
+
+ frame = receiveFrame(100000);
+ Assert.assertTrue(frame.startsWith("CONNECTED"));
+
+ frame = "SUBSCRIBE\n" + "destination:" +
+ getQueuePrefix() +
+ nonExistantQueue +
+ "\n" +
+ "receipt: 12\n" +
+ "\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+ // wait for SUBSCRIBE's receipt
+ frame = receiveFrame(10000);
+ Assert.assertTrue(frame.startsWith("RECEIPT"));
+
+ sendMessage(getName(), ActiveMQJMSClient.createQueue(nonExistantQueue));
+
+ frame = receiveFrame(10000);
+ Assert.assertTrue(frame.startsWith("MESSAGE"));
+ Assert.assertTrue(frame.indexOf("destination:") > 0);
+ Assert.assertTrue(frame.indexOf(getName()) > 0);
+
+ assertNotNull(server.getActiveMQServer().getPostOffice().getBinding(new SimpleString(ResourceNames.JMS_QUEUE + nonExistantQueue)));
+
+ frame = "UNSUBSCRIBE\n" + "destination:" +
+ getQueuePrefix() +
+ nonExistantQueue +
+ "\n" +
+ "receipt: 1234\n" +
+ "\n\n" +
+ Stomp.NULL;
+ sendFrame(frame);
+ // wait for UNSUBSCRIBE's receipt
+ frame = receiveFrame(10000);
+ Assert.assertTrue(frame.startsWith("RECEIPT"));
+
+ assertNull(server.getActiveMQServer().getPostOffice().getBinding(new SimpleString(ResourceNames.JMS_QUEUE + nonExistantQueue)));
+
+ sendMessage(getName(), ActiveMQJMSClient.createQueue(nonExistantQueue));
+
+ frame = receiveFrame(1000);
+ log.info("Received frame: " + frame);
+ Assert.assertNull("No message should have been received since subscription was removed", frame);
+
+
+ frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
+ sendFrame(frame);
+ }
+
+ @Test
public void testDurableSubscriberWithReconnection() throws Exception
{
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/13104422/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/stomp/StompTestBase.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/stomp/StompTestBase.java b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/stomp/StompTestBase.java
index 3aec083..e7c7819 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/stomp/StompTestBase.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/stomp/StompTestBase.java
@@ -114,6 +114,7 @@ public abstract class StompTestBase extends UnitTestCase
if (autoCreateServer)
{
server = createServer();
+ addServer(server.getActiveMQServer());
server.start();
connectionFactory = createConnectionFactory();
createBootstrap();
@@ -231,9 +232,8 @@ public abstract class StompTestBase extends UnitTestCase
if (group != null)
{
channel.close();
- group.shutdown();
+ group.shutdownGracefully(0, 5000, TimeUnit.MILLISECONDS);
}
- server.stop();
}
super.tearDown();
}
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/13104422/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/stomp/v11/StompV11Test.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/stomp/v11/StompV11Test.java b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/stomp/v11/StompV11Test.java
index 0984353..5117952 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/stomp/v11/StompV11Test.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/stomp/v11/StompV11Test.java
@@ -30,6 +30,7 @@ import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import org.apache.activemq.core.settings.impl.AddressSettings;
import org.apache.activemq.tests.integration.IntegrationTestLogger;
import org.apache.activemq.tests.integration.stomp.util.ClientStompFrame;
import org.apache.activemq.tests.integration.stomp.util.StompClientConnection;
@@ -2354,8 +2355,11 @@ public class StompV11Test extends StompV11TestBase
}
@Test
- public void testSendMessageToNonExistentJmsQueue() throws Exception
+ public void testSendMessageToNonExistentJmsQueueWithoutAutoCreation() throws Exception
{
+ AddressSettings addressSettings = new AddressSettings();
+ addressSettings.setAutoCreateJmsQueues(false);
+ server.getActiveMQServer().getAddressSettingsRepository().addMatch("#", addressSettings);
connV11.connect(defUser, defPass);
ClientStompFrame frame = connV11.createFrame("SEND");
@@ -2374,6 +2378,26 @@ public class StompV11Test extends StompV11TestBase
}
@Test
+ public void testSendMessageToNonExistentJmsQueueWithAutoCreation() throws Exception
+ {
+ connV11.connect(defUser, defPass);
+
+ ClientStompFrame frame = connV11.createFrame("SEND");
+ String guid = UUID.randomUUID().toString();
+ frame.addHeader("destination", "jms.queue.NonExistentQueue" + guid);
+ frame.addHeader("receipt", "1234");
+ frame.setBody("Hello World");
+
+ frame = connV11.sendFrame(frame);
+
+ assertTrue(frame.getCommand().equals("RECEIPT"));
+ assertEquals("1234", frame.getHeader("receipt-id"));
+ System.out.println("message: " + frame.getHeader("message"));
+
+ connV11.disconnect();
+ }
+
+ @Test
public void testSendAndReceiveWithEscapedCharactersInSenderId() throws Exception
{
connV11.connect(defUser, defPass);
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/13104422/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/stomp/v12/StompV12Test.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/stomp/v12/StompV12Test.java b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/stomp/v12/StompV12Test.java
index 3b6e471..7a308af 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/stomp/v12/StompV12Test.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/stomp/v12/StompV12Test.java
@@ -31,6 +31,7 @@ import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import org.apache.activemq.core.settings.impl.AddressSettings;
import org.apache.activemq.tests.integration.IntegrationTestLogger;
import org.apache.activemq.tests.integration.stomp.util.ClientStompFrame;
import org.apache.activemq.tests.integration.stomp.util.StompClientConnection;
@@ -400,6 +401,10 @@ public class StompV12Test extends StompV11TestBase
@Test
public void testHeaderRepetitive() throws Exception
{
+ AddressSettings addressSettings = new AddressSettings();
+ addressSettings.setAutoCreateJmsQueues(false);
+ server.getActiveMQServer().getAddressSettingsRepository().addMatch("#", addressSettings);
+
connV12.connect(defUser, defPass);
ClientStompFrame frame = connV12.createFrame("SEND");
@@ -2617,8 +2622,11 @@ public class StompV12Test extends StompV11TestBase
}
@Test
- public void testSendMessageToNonExistentJmsQueue() throws Exception
+ public void testSendMessageToNonExistentJmsQueueWithoutAutoCreation() throws Exception
{
+ AddressSettings addressSettings = new AddressSettings();
+ addressSettings.setAutoCreateJmsQueues(false);
+ server.getActiveMQServer().getAddressSettingsRepository().addMatch("#", addressSettings);
connV12.connect(defUser, defPass);
ClientStompFrame frame = connV12.createFrame("SEND");
@@ -2637,6 +2645,26 @@ public class StompV12Test extends StompV11TestBase
}
@Test
+ public void testSendMessageToNonExistentJmsQueueWithAutoCreation() throws Exception
+ {
+ connV12.connect(defUser, defPass);
+
+ ClientStompFrame frame = connV12.createFrame("SEND");
+ String guid = UUID.randomUUID().toString();
+ frame.addHeader("destination", "jms.queue.NonExistentQueue" + guid);
+ frame.addHeader("receipt", "1234");
+ frame.setBody("Hello World");
+
+ frame = connV12.sendFrame(frame);
+
+ assertTrue(frame.getCommand().equals("RECEIPT"));
+ assertEquals("1234", frame.getHeader("receipt-id"));
+ System.out.println("message: " + frame.getHeader("message"));
+
+ connV12.disconnect();
+ }
+
+ @Test
public void testInvalidStompCommand() throws Exception
{
try