You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by jb...@apache.org on 2016/11/11 19:23:37 UTC
[07/11] activemq-artemis git commit: Stomp refactor + track
autocreation for addresses
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/84df373a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java
index 90d18ae..91ab5d3 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java
@@ -42,48 +42,70 @@ import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding;
import org.apache.activemq.artemis.core.protocol.stomp.Stomp;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
+import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
import org.apache.activemq.artemis.tests.integration.mqtt.imported.FuseMQTTClientProvider;
import org.apache.activemq.artemis.tests.integration.mqtt.imported.MQTTClientProvider;
+import org.apache.activemq.artemis.tests.integration.stomp.util.ClientStompFrame;
+import org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConnection;
+import org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConnectionFactory;
import org.apache.activemq.artemis.tests.util.Wait;
import org.apache.activemq.artemis.utils.RandomUtil;
+import org.junit.After;
import org.junit.Assert;
+import org.junit.Before;
import org.junit.Test;
public class StompTest extends StompTestBase {
private static final transient IntegrationTestLogger log = IntegrationTestLogger.LOGGER;
+ protected StompClientConnection conn;
+
+ @Override
+ @Before
+ public void setUp() throws Exception {
+ super.setUp();
+ conn = StompClientConnectionFactory.createClientConnection("1.0", hostname, port);
+ }
+
+ @Override
+ @After
+ public void tearDown() throws Exception {
+ try {
+ boolean connected = conn != null && conn.isConnected();
+ log.debug("Connection 1.0 connected: " + connected);
+ if (connected) {
+ conn.disconnect();
+ }
+ } finally {
+ super.tearDown();
+ }
+ }
@Test
public void testConnectionTTL() throws Exception {
- int index = 1;
int port = 61614;
server.getActiveMQServer().getRemotingService().createAcceptor("test", "tcp://127.0.0.1:" + port + "?connectionTtl=1000").start();
- createBootstrap(index, port);
- String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
- sendFrame(index, frame);
- frame = receiveFrame(index, 10000);
-
- Assert.assertTrue(frame.startsWith("CONNECTED"));
+ conn = StompClientConnectionFactory.createClientConnection("1.0", hostname, port);
+ conn.connect("brianm", "wombats");
Thread.sleep(5000);
- assertTrue(receiveFrame(index, 10000).indexOf(Stomp.Responses.ERROR) != -1);
+ ClientStompFrame frame = conn.receiveFrame();
- assertChannelClosed(index);
+ assertEquals(Stomp.Responses.ERROR, frame.getCommand());
+
+ assertFalse(conn.isConnected());
}
@Test
public void testSendManyMessages() throws Exception {
- MessageConsumer consumer = session.createConsumer(queue);
+ conn.connect(defUser, defPass);
- String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
- sendFrame(frame);
- frame = receiveFrame(10000);
+ MessageConsumer consumer = session.createConsumer(queue);
- Assert.assertTrue(frame.startsWith("CONNECTED"));
int count = 1000;
final CountDownLatch latch = new CountDownLatch(count);
consumer.setMessageListener(new MessageListener() {
@@ -94,11 +116,8 @@ public class StompTest extends StompTestBase {
}
});
- frame = "SEND\n" + "destination:" + getQueuePrefix() + getQueueName() + "\n\n" + "Hello World" + Stomp.NULL;
for (int i = 1; i <= count; i++) {
- // Thread.sleep(1);
- // System.out.println(">>> " + i);
- sendFrame(frame);
+ send(conn, getQueuePrefix() + getQueueName(), null, "Hello World!");
}
assertTrue(latch.await(60, TimeUnit.SECONDS));
@@ -110,11 +129,7 @@ public class StompTest extends StompTestBase {
try {
MessageConsumer consumer = session.createConsumer(queue);
- String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
- sendFrame(frame);
- frame = receiveFrame(10000);
-
- Assert.assertTrue(frame.startsWith("CONNECTED"));
+ conn.connect(defUser, defPass);
int count = 1000;
final CountDownLatch latch = new CountDownLatch(count);
consumer.setMessageListener(new MessageListener() {
@@ -125,13 +140,14 @@ public class StompTest extends StompTestBase {
}
});
- ((ActiveMQServerImpl) server.getActiveMQServer()).getMonitor().setMaxUsage(0).tick();
+ ((ActiveMQServerImpl) server.getActiveMQServer()).getMonitor()
+ .setMaxUsage(0)
+ .tick();
- frame = "SEND\n" + "destination:" + getQueuePrefix() + getQueueName() + "\n\n" + "Hello World" + Stomp.NULL;
for (int i = 1; i <= count; i++) {
// Thread.sleep(1);
- // System.out.println(">>> " + i);
- sendFrame(frame);
+ // log.info(">>> " + i);
+ send(conn, getQueuePrefix() + getQueueName(), null, "Hello World!");
}
// It should encounter the exception on logs
@@ -140,52 +156,44 @@ public class StompTest extends StompTestBase {
AssertionLoggerHandler.clear();
AssertionLoggerHandler.stopCapture();
}
-
}
@Test
public void testConnect() throws Exception {
-
- String connect_frame = "CONNECT\n" + "login: brianm\n" +
- "passcode: wombats\n" +
- "request-id: 1\n" +
- "\n" +
- Stomp.NULL;
- sendFrame(connect_frame);
-
- String f = receiveFrame(10000);
- Assert.assertTrue(f.startsWith("CONNECTED"));
- Assert.assertTrue(f.indexOf("response-id:1") >= 0);
+ ClientStompFrame frame = conn.createFrame(Stomp.Commands.CONNECT)
+ .addHeader(Stomp.Headers.Connect.LOGIN, defUser)
+ .addHeader(Stomp.Headers.Connect.PASSCODE, defPass)
+ .addHeader(Stomp.Headers.Connect.REQUEST_ID, "1");
+ ClientStompFrame response = conn.sendFrame(frame);
+
+ Assert.assertTrue(response.getCommand()
+ .equals(Stomp.Responses.CONNECTED));
+ Assert.assertTrue(response.getHeader(Stomp.Headers.Connected.RESPONSE_ID)
+ .equals("1"));
}
@Test
public void testDisconnectAndError() throws Exception {
+ ClientStompFrame frame = conn.createFrame(Stomp.Commands.CONNECT)
+ .addHeader(Stomp.Headers.Connect.LOGIN, defUser)
+ .addHeader(Stomp.Headers.Connect.PASSCODE, defPass)
+ .addHeader(Stomp.Headers.Connect.REQUEST_ID, "1");
+ ClientStompFrame response = conn.sendFrame(frame);
- String connectFrame = "CONNECT\n" + "login: brianm\n" +
- "passcode: wombats\n" +
- "request-id: 1\n" +
- "\n" +
- Stomp.NULL;
- sendFrame(connectFrame);
+ Assert.assertTrue(response.getCommand()
+ .equals(Stomp.Responses.CONNECTED));
+ Assert.assertTrue(response.getHeader(Stomp.Headers.Connected.RESPONSE_ID)
+ .equals("1"));
- String f = receiveFrame(10000);
- Assert.assertTrue(f.startsWith("CONNECTED"));
- Assert.assertTrue(f.indexOf("response-id:1") >= 0);
-
- String disconnectFrame = "DISCONNECT\n\n" + Stomp.NULL;
- sendFrame(disconnectFrame);
-
- waitForFrameToTakeEffect();
+ conn.disconnect();
// sending a message will result in an error
- String frame = "SEND\n" + "destination:" +
- getQueuePrefix() +
- getQueueName() +
- "\n\n" +
- "Hello World" +
- Stomp.NULL;
-
- assertChannelClosed();
+ try {
+ send(conn, getQueuePrefix() + getQueueName(), null, "Hello World!");
+ fail("Should have thrown an exception since connection is disconnected");
+ } catch (Exception e) {
+ // ignore
+ }
}
@Test
@@ -193,15 +201,9 @@ public class StompTest extends StompTestBase {
MessageConsumer consumer = session.createConsumer(queue);
- String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
- sendFrame(frame);
-
- frame = receiveFrame(10000);
- Assert.assertTrue(frame.startsWith("CONNECTED"));
+ conn.connect(defUser, defPass);
- frame = "SEND\n" + "destination:" + getQueuePrefix() + getQueueName() + "\n\n" + "Hello World" + Stomp.NULL;
-
- sendFrame(frame);
+ send(conn, getQueuePrefix() + getQueueName(), null, "Hello World");
TextMessage message = (TextMessage) consumer.receive(1000);
Assert.assertNotNull(message);
@@ -218,21 +220,16 @@ public class StompTest extends StompTestBase {
@Test
public void sendSTOMPReceiveMQTT() throws Exception {
- String address = "myTestAddress";
-
// Set up MQTT Subscription
MQTTClientProvider clientProvider = new FuseMQTTClientProvider();
- clientProvider.connect("tcp://localhost:61616");
- clientProvider.subscribe(address, 0);
+ clientProvider.connect("tcp://" + hostname + ":" + port);
+ clientProvider.subscribe(getTopicPrefix() + getTopicName(), 0);
String stompPayload = "This is a test message";
// Set up STOMP connection and send STOMP Message
- String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
- sendFrame(frame);
-
- frame = "SEND\n" + "destination:" + address + "\n\n" + stompPayload + Stomp.NULL;
- sendFrame(frame);
+ conn.connect(defUser, defPass);
+ send(conn, getTopicPrefix() + getTopicName(), null, stompPayload);
// Receive MQTT Message
byte[] mqttPayload = clientProvider.receive(10000);
@@ -244,44 +241,30 @@ public class StompTest extends StompTestBase {
@Test
public void sendMQTTReceiveSTOMP() throws Exception {
- String address = "myTestAddress";
String payload = "This is a test message";
- server.getActiveMQServer().createQueue(new SimpleString(address), new SimpleString(address), null, false, false);
-
// Set up STOMP subscription
- String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
- sendFrame(frame);
-
- frame = "SUBSCRIBE\n" + "destination:" + address + "\n" + "ack:auto\n\n" + Stomp.NULL;
- sendFrame(frame);
- receiveFrame(1000);
+ conn.connect(defUser, defPass);
+ subscribe(conn, null, Stomp.Headers.Subscribe.AckModeValues.AUTO);
// Send MQTT Message
MQTTClientProvider clientProvider = new FuseMQTTClientProvider();
- clientProvider.connect("tcp://localhost:61616");
- clientProvider.publish(address, payload.getBytes(), 0);
+ clientProvider.connect("tcp://" + hostname + ":" + port);
+ clientProvider.publish(getQueuePrefix() + getQueueName(), payload.getBytes(), 0);
clientProvider.disconnect();
// Receive STOMP Message
- frame = receiveFrame(1000);
- assertTrue(frame.contains(payload));
+ ClientStompFrame frame = conn.receiveFrame();
+ assertTrue(frame.getBody()
+ .contains(payload));
}
@Test
public void testSendMessageToNonExistentQueue() throws Exception {
String nonExistentQueue = RandomUtil.randomString();
- String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
- sendFrame(frame);
-
- frame = receiveFrame(10000);
- Assert.assertTrue(frame.startsWith("CONNECTED"));
-
- frame = "SEND\n" + "destination:" + getQueuePrefix() + nonExistentQueue + "\n\n" + "Hello World" + Stomp.NULL;
-
- sendFrame(frame);
- receiveFrame(1000);
+ conn.connect(defUser, defPass);
+ send(conn, getQueuePrefix() + nonExistentQueue, null, "Hello World", true, AddressInfo.RoutingType.ANYCAST);
MessageConsumer consumer = session.createConsumer(ActiveMQJMSClient.createQueue(nonExistentQueue));
TextMessage message = (TextMessage) consumer.receive(1000);
@@ -297,29 +280,26 @@ public class StompTest extends StompTestBase {
Assert.assertTrue(Math.abs(tnow - tmsg) < 1500);
// closing the consumer here should trigger auto-deletion
- assertNotNull(server.getActiveMQServer().getPostOffice().getBinding(new SimpleString(nonExistentQueue)));
+ assertNotNull(server.getActiveMQServer()
+ .getPostOffice()
+ .getBinding(new SimpleString(nonExistentQueue)));
consumer.close();
- assertNull(server.getActiveMQServer().getPostOffice().getBinding(new SimpleString(nonExistentQueue)));
+ assertNull(server.getActiveMQServer()
+ .getPostOffice()
+ .getBinding(new SimpleString(nonExistentQueue)));
}
@Test
public void testSendMessageToNonExistentTopic() throws Exception {
String nonExistentTopic = RandomUtil.randomString();
- String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
- sendFrame(frame);
-
- frame = receiveFrame(10000);
- Assert.assertTrue(frame.startsWith("CONNECTED"));
+ conn.connect(defUser, defPass);
// first send a message to ensure that sending to a non-existent topic won't throw an error
- frame = "SEND\n" + "destination:" + getTopicPrefix() + nonExistentTopic + "\n\n" + "Hello World" + Stomp.NULL;
- sendFrame(frame);
- receiveFrame(1000);
+ send(conn, getTopicPrefix() + nonExistentTopic, null, "Hello World", true);
// create a subscription on the topic and send/receive another message
MessageConsumer consumer = session.createConsumer(ActiveMQJMSClient.createTopic(nonExistentTopic));
- sendFrame(frame);
- receiveFrame(1000);
+ send(conn, getTopicPrefix() + nonExistentTopic, null, "Hello World", true);
TextMessage message = (TextMessage) consumer.receive(1000);
Assert.assertNotNull(message);
Assert.assertEquals("Hello World", message.getText());
@@ -332,11 +312,13 @@ public class StompTest extends StompTestBase {
long tmsg = message.getJMSTimestamp();
Assert.assertTrue(Math.abs(tnow - tmsg) < 1500);
- assertNotNull(server.getActiveMQServer().getPostOffice().getBinding(new SimpleString(nonExistentTopic)));
+ assertNotNull(server.getActiveMQServer()
+ .getAddressInfo(new SimpleString(nonExistentTopic)));
- // closing the consumer here should trigger auto-deletion of the topic
+ // closing the consumer here should trigger auto-deletion of the subscription queue and address
consumer.close();
- assertNull(server.getActiveMQServer().getPostOffice().getBinding(new SimpleString(nonExistentTopic)));
+ assertNull(server.getActiveMQServer()
+ .getAddressInfo(new SimpleString(nonExistentTopic)));
}
/*
@@ -346,29 +328,22 @@ public class StompTest extends StompTestBase {
*/
@Test
public void testSendMessageWithLeadingNewLine() throws Exception {
+ conn.connect(defUser, defPass);
+ ClientStompFrame frame = conn.createFrame(Stomp.Commands.SEND)
+ .addHeader(Stomp.Headers.Send.DESTINATION, getQueuePrefix() + getQueueName())
+ .setBody("Hello World");
+ conn.sendWickedFrame(frame);
MessageConsumer consumer = session.createConsumer(queue);
-
- String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL + "\n";
- sendFrame(frame);
-
- frame = receiveFrame(10000);
- Assert.assertTrue(frame.startsWith("CONNECTED"));
-
- frame = "SEND\n" + "destination:" +
- getQueuePrefix() +
- getQueueName() +
- "\n\n" +
- "Hello World" +
- Stomp.NULL +
- "\n";
-
- sendFrame(frame);
-
TextMessage message = (TextMessage) consumer.receive(1000);
Assert.assertNotNull(message);
Assert.assertEquals("Hello World", message.getText());
+ send(conn, getQueuePrefix() + getQueueName(), null, "Hello World");
+ message = (TextMessage) consumer.receive(1000);
+ Assert.assertNotNull(message);
+ Assert.assertEquals("Hello World", message.getText());
+
// Make sure that the timestamp is valid - should
// be very close to the current time.
long tnow = System.currentTimeMillis();
@@ -380,25 +355,9 @@ public class StompTest extends StompTestBase {
public void testSendMessageWithReceipt() throws Exception {
MessageConsumer consumer = session.createConsumer(queue);
- String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
- sendFrame(frame);
-
- frame = receiveFrame(10000);
- Assert.assertTrue(frame.startsWith("CONNECTED"));
+ conn.connect(defUser, defPass);
- frame = "SEND\n" + "destination:" +
- getQueuePrefix() +
- getQueueName() +
- "\n" +
- "receipt: 1234\n\n" +
- "Hello World" +
- Stomp.NULL;
-
- sendFrame(frame);
-
- String f = receiveFrame(10000);
- Assert.assertTrue(f.startsWith("RECEIPT"));
- Assert.assertTrue(f.indexOf("receipt-id:1234") >= 0);
+ send(conn, getQueuePrefix() + getQueueName(), null, "Hello World", true);
TextMessage message = (TextMessage) consumer.receive(1000);
Assert.assertNotNull(message);
@@ -413,30 +372,19 @@ public class StompTest extends StompTestBase {
@Test
public void testSendMessageWithContentLength() throws Exception {
-
MessageConsumer consumer = session.createConsumer(queue);
-
- String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
- sendFrame(frame);
-
- frame = receiveFrame(10000);
- Assert.assertTrue(frame.startsWith("CONNECTED"));
+ conn.connect(defUser, defPass);
byte[] data = new byte[]{1, 0, 0, 4};
-
- frame = "SEND\n" + "destination:" +
- getQueuePrefix() +
- getQueueName() +
- "\n" +
- "content-length:" +
- data.length +
- "\n\n";
ByteArrayOutputStream baos = new ByteArrayOutputStream();
- baos.write(frame.getBytes(StandardCharsets.UTF_8));
baos.write(data);
- baos.write('\0');
baos.flush();
- sendFrame(new String(baos.toByteArray()));
+
+ ClientStompFrame frame = conn.createFrame(Stomp.Commands.SEND)
+ .addHeader(Stomp.Headers.Send.DESTINATION, getQueuePrefix() + getQueueName())
+ .addHeader(Stomp.Headers.CONTENT_LENGTH, Integer.toString(data.length))
+ .setBody(new String(baos.toByteArray()));
+ conn.sendFrame(frame);
BytesMessage message = (BytesMessage) consumer.receive(10000);
Assert.assertNotNull(message);
@@ -449,30 +397,22 @@ public class StompTest extends StompTestBase {
@Test
public void testJMSXGroupIdCanBeSet() throws Exception {
-
+ final String jmsxGroupID = "JMSXGroupID";
MessageConsumer consumer = session.createConsumer(queue);
- String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
- sendFrame(frame);
-
- frame = receiveFrame(10000);
- Assert.assertTrue(frame.startsWith("CONNECTED"));
+ conn.connect(defUser, defPass);
- frame = "SEND\n" + "destination:" +
- getQueuePrefix() +
- getQueueName() +
- "\n" +
- "JMSXGroupID: TEST\n\n" +
- "Hello World" +
- Stomp.NULL;
-
- sendFrame(frame);
+ ClientStompFrame frame = conn.createFrame(Stomp.Commands.SEND)
+ .addHeader(Stomp.Headers.Send.DESTINATION, getQueuePrefix() + getQueueName())
+ .addHeader("JMSXGroupID", jmsxGroupID)
+ .setBody("Hello World");
+ conn.sendFrame(frame);
TextMessage message = (TextMessage) consumer.receive(1000);
Assert.assertNotNull(message);
Assert.assertEquals("Hello World", message.getText());
// differ from StompConnect
- Assert.assertEquals("TEST", message.getStringProperty("JMSXGroupID"));
+ Assert.assertEquals(jmsxGroupID, message.getStringProperty("JMSXGroupID"));
}
@Test
@@ -480,22 +420,14 @@ public class StompTest extends StompTestBase {
MessageConsumer consumer = session.createConsumer(queue, "foo = 'abc'");
- String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
- sendFrame(frame);
-
- frame = receiveFrame(10000);
- Assert.assertTrue(frame.startsWith("CONNECTED"));
+ conn.connect(defUser, defPass);
- frame = "SEND\n" + "foo:abc\n" +
- "bar:123\n" +
- "destination:" +
- getQueuePrefix() +
- getQueueName() +
- "\n\n" +
- "Hello World" +
- Stomp.NULL;
-
- sendFrame(frame);
+ ClientStompFrame frame = conn.createFrame(Stomp.Commands.SEND)
+ .addHeader(Stomp.Headers.Send.DESTINATION, getQueuePrefix() + getQueueName())
+ .addHeader("foo", "abc")
+ .addHeader("bar", "123")
+ .setBody("Hello World");
+ conn.sendFrame(frame);
TextMessage message = (TextMessage) consumer.receive(1000);
Assert.assertNotNull(message);
@@ -509,22 +441,14 @@ public class StompTest extends StompTestBase {
MessageConsumer consumer = session.createConsumer(queue, "hyphenated_props:b-ar = '123'");
- String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
- sendFrame(frame);
-
- frame = receiveFrame(10000);
- Assert.assertTrue(frame.startsWith("CONNECTED"));
-
- frame = "SEND\n" + "foo:abc\n" +
- "b-ar:123\n" +
- "destination:" +
- getQueuePrefix() +
- getQueueName() +
- "\n\n" +
- "Hello World" +
- Stomp.NULL;
+ conn.connect(defUser, defPass);
- sendFrame(frame);
+ ClientStompFrame frame = conn.createFrame(Stomp.Commands.SEND)
+ .addHeader(Stomp.Headers.Send.DESTINATION, getQueuePrefix() + getQueueName())
+ .addHeader("foo", "abc")
+ .addHeader("b-ar", "123")
+ .setBody("Hello World");
+ conn.sendFrame(frame);
TextMessage message = (TextMessage) consumer.receive(1000);
Assert.assertNotNull(message);
@@ -538,27 +462,19 @@ public class StompTest extends StompTestBase {
MessageConsumer consumer = session.createConsumer(queue);
- String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
- sendFrame(frame);
-
- frame = receiveFrame(10000);
- Assert.assertTrue(frame.startsWith("CONNECTED"));
-
- frame = "SEND\n" + "correlation-id:c123\n" +
- "persistent:true\n" +
- "priority:3\n" +
- "type:t345\n" +
- "JMSXGroupID:abc\n" +
- "foo:abc\n" +
- "bar:123\n" +
- "destination:" +
- getQueuePrefix() +
- getQueueName() +
- "\n\n" +
- "Hello World" +
- Stomp.NULL;
+ conn.connect(defUser, defPass);
- sendFrame(frame);
+ ClientStompFrame frame = conn.createFrame(Stomp.Commands.SEND)
+ .addHeader(Stomp.Headers.Send.DESTINATION, getQueuePrefix() + getQueueName())
+ .addHeader("foo", "abc")
+ .addHeader("bar", "123")
+ .addHeader("correlation-id", "c123")
+ .addHeader("persistent", "true")
+ .addHeader("type", "t345")
+ .addHeader("JMSXGroupID", "abc")
+ .addHeader("priority", "3")
+ .setBody("Hello World");
+ conn.sendFrame(frame);
TextMessage message = (TextMessage) consumer.receive(1000);
Assert.assertNotNull(message);
@@ -579,33 +495,25 @@ public class StompTest extends StompTestBase {
public void testSendMessageWithLongHeaders() throws Exception {
MessageConsumer consumer = session.createConsumer(queue);
- String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
- sendFrame(frame);
-
- frame = receiveFrame(10000);
- Assert.assertTrue(frame.startsWith("CONNECTED"));
+ conn.connect(defUser, defPass);
StringBuffer buffer = new StringBuffer();
for (int i = 0; i < 1024; i++) {
buffer.append("a");
}
- String longHeader = "longHeader:" + buffer.toString() + "\n";
-
- frame = "SEND\n" + "correlation-id:c123\n" +
- "persistent:true\n" +
- "priority:3\n" +
- "type:t345\n" +
- "JMSXGroupID:abc\n" +
- "foo:abc\n" +
- longHeader +
- "destination:" +
- getQueuePrefix() +
- getQueueName() +
- "\n\n" +
- "Hello World" +
- Stomp.NULL;
-
- sendFrame(frame);
+
+ ClientStompFrame frame = conn.createFrame(Stomp.Commands.SEND)
+ .addHeader(Stomp.Headers.Send.DESTINATION, getQueuePrefix() + getQueueName())
+ .addHeader("foo", "abc")
+ .addHeader("bar", "123")
+ .addHeader("correlation-id", "c123")
+ .addHeader("persistent", "true")
+ .addHeader("type", "t345")
+ .addHeader("JMSXGroupID", "abc")
+ .addHeader("priority", "3")
+ .addHeader("longHeader", buffer.toString())
+ .setBody("Hello World");
+ conn.sendFrame(frame);
TextMessage message = (TextMessage) consumer.receive(1000);
Assert.assertNotNull(message);
@@ -615,37 +523,30 @@ public class StompTest extends StompTestBase {
Assert.assertEquals("getJMSPriority", 3, message.getJMSPriority());
Assert.assertEquals(DeliveryMode.PERSISTENT, message.getJMSDeliveryMode());
Assert.assertEquals("foo", "abc", message.getStringProperty("foo"));
- Assert.assertEquals("longHeader", 1024, message.getStringProperty("longHeader").length());
+ Assert.assertEquals("longHeader", 1024, message.getStringProperty("longHeader")
+ .length());
Assert.assertEquals("JMSXGroupID", "abc", message.getStringProperty("JMSXGroupID"));
}
@Test
public void testSubscribeWithAutoAck() throws Exception {
+ conn.connect(defUser, defPass);
- String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
- sendFrame(frame);
-
- frame = receiveFrame(100000);
- Assert.assertTrue(frame.startsWith("CONNECTED"));
+ subscribe(conn, null, Stomp.Headers.Subscribe.AckModeValues.AUTO);
- frame = "SUBSCRIBE\n" + "destination:" + getQueuePrefix() + getQueueName() + "\n" + "ack:auto\n\nfff" + Stomp.NULL;
- sendFrame(frame);
+ sendJmsMessage(getName());
- sendMessage(getName());
+ ClientStompFrame frame = conn.receiveFrame(10000);
+ Assert.assertEquals(Stomp.Responses.MESSAGE, frame.getCommand());
+ Assert.assertEquals(getQueuePrefix() + getQueueName(), frame.getHeader(Stomp.Headers.Send.DESTINATION));
+ Assert.assertEquals(getName(), frame.getBody());
- frame = receiveFrame(10000);
- System.out.println("-------- frame received: " + frame);
- Assert.assertTrue(frame.startsWith("MESSAGE"));
- Assert.assertTrue(frame.indexOf("destination:") > 0);
- Assert.assertTrue(frame.indexOf(getName()) > 0);
-
- Pattern cl = Pattern.compile("Content-length:\\s*(\\d+)", Pattern.CASE_INSENSITIVE);
- Matcher cl_matcher = cl.matcher(frame);
+ Pattern cl = Pattern.compile(Stomp.Headers.CONTENT_LENGTH + ":\\s*(\\d+)", Pattern.CASE_INSENSITIVE);
+ Matcher cl_matcher = cl.matcher(frame.toString());
Assert.assertFalse(cl_matcher.find());
- frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
- sendFrame(frame);
+ conn.disconnect();
// message should not be received as it was auto-acked
MessageConsumer consumer = session.createConsumer(queue);
@@ -656,48 +557,32 @@ public class StompTest extends StompTestBase {
@Test
public void testSubscribeWithAutoAckAndBytesMessage() throws Exception {
-
- String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
- sendFrame(frame);
-
- frame = receiveFrame(100000);
- Assert.assertTrue(frame.startsWith("CONNECTED"));
-
- frame = "SUBSCRIBE\n" + "destination:" + getQueuePrefix() + getQueueName() + "\n" + "ack:auto\n\n" + Stomp.NULL;
- sendFrame(frame);
+ conn.connect(defUser, defPass);
+ subscribe(conn, null, Stomp.Headers.Subscribe.AckModeValues.AUTO);
byte[] payload = new byte[]{1, 2, 3, 4, 5};
- sendMessage(payload, queue);
- frame = receiveFrame(10000);
+ sendJmsMessage(payload, queue);
- System.out.println("Message: " + frame);
+ ClientStompFrame frame = conn.receiveFrame(10000);
- Assert.assertTrue(frame.startsWith("MESSAGE"));
+ Assert.assertEquals(Stomp.Responses.MESSAGE, frame.getCommand());
- Pattern cl = Pattern.compile("Content-length:\\s*(\\d+)", Pattern.CASE_INSENSITIVE);
- Matcher cl_matcher = cl.matcher(frame);
+ Pattern cl = Pattern.compile(Stomp.Headers.CONTENT_LENGTH + ":\\s*(\\d+)", Pattern.CASE_INSENSITIVE);
+ Matcher cl_matcher = cl.matcher(frame.toString());
Assert.assertTrue(cl_matcher.find());
Assert.assertEquals("5", cl_matcher.group(1));
- Assert.assertFalse(Pattern.compile("type:\\s*null", Pattern.CASE_INSENSITIVE).matcher(frame).find());
- Assert.assertTrue(frame.indexOf(new String(payload)) > -1);
+ Assert.assertFalse(Pattern.compile("type:\\s*null", Pattern.CASE_INSENSITIVE).matcher(frame.toString()).find());
+ Assert.assertTrue(frame.getBody().toString().indexOf(new String(payload)) > -1);
- frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
- sendFrame(frame);
+ conn.disconnect();
}
@Test
public void testSubscribeWithMessageSentWithProperties() throws Exception {
-
- String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
- sendFrame(frame);
-
- frame = receiveFrame(100000);
- Assert.assertTrue(frame.startsWith("CONNECTED"));
-
- frame = "SUBSCRIBE\n" + "destination:" + getQueuePrefix() + getQueueName() + "\n" + "ack:auto\n\n" + Stomp.NULL;
- sendFrame(frame);
+ conn.connect(defUser, defPass);
+ subscribe(conn, null, Stomp.Headers.Subscribe.AckModeValues.AUTO);
MessageProducer producer = session.createProducer(queue);
BytesMessage message = session.createBytesMessage();
@@ -712,79 +597,55 @@ public class StompTest extends StompTestBase {
message.writeBytes("Hello World".getBytes(StandardCharsets.UTF_8));
producer.send(message);
- frame = receiveFrame(10000);
+ ClientStompFrame frame = conn.receiveFrame(10000);
Assert.assertNotNull(frame);
- Assert.assertTrue(frame.startsWith("MESSAGE"));
- Assert.assertTrue(frame.indexOf("S:") > 0);
- Assert.assertTrue(frame.indexOf("n:") > 0);
- Assert.assertTrue(frame.indexOf("byte:") > 0);
- Assert.assertTrue(frame.indexOf("d:") > 0);
- Assert.assertTrue(frame.indexOf("f:") > 0);
- Assert.assertTrue(frame.indexOf("i:") > 0);
- Assert.assertTrue(frame.indexOf("l:") > 0);
- Assert.assertTrue(frame.indexOf("s:") > 0);
- Assert.assertTrue(frame.indexOf("Hello World") > 0);
-
- // System.out.println("out: "+frame);
-
- frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
- sendFrame(frame);
+ Assert.assertEquals(Stomp.Responses.MESSAGE, frame.getCommand());
+ Assert.assertEquals("value", frame.getHeader("S"));
+ Assert.assertEquals("false", frame.getHeader("n"));
+ Assert.assertEquals("9", frame.getHeader("byte"));
+ Assert.assertEquals("2.0", frame.getHeader("d"));
+ Assert.assertEquals("6.0", frame.getHeader("f"));
+ Assert.assertEquals("10", frame.getHeader("i"));
+ Assert.assertEquals("121", frame.getHeader("l"));
+ Assert.assertEquals("12", frame.getHeader("s"));
+ Assert.assertEquals("Hello World", frame.getBody());
+
+ conn.disconnect();
}
@Test
public void testSubscribeWithID() throws Exception {
+ conn.connect(defUser, defPass);
+ subscribe(conn, "mysubid", Stomp.Headers.Subscribe.AckModeValues.AUTO);
- String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
- sendFrame(frame);
+ sendJmsMessage(getName());
- frame = receiveFrame(100000);
- Assert.assertTrue(frame.startsWith("CONNECTED"));
+ ClientStompFrame frame = conn.receiveFrame(10000);
+ Assert.assertEquals(Stomp.Responses.MESSAGE, frame.getCommand());
+ Assert.assertEquals(getQueuePrefix() + getQueueName(), frame.getHeader(Stomp.Headers.Message.DESTINATION));
+ Assert.assertEquals("mysubid", frame.getHeader(Stomp.Headers.Message.SUBSCRIPTION));
+ Assert.assertEquals(getName(), frame.getBody());
- frame = "SUBSCRIBE\n" + "destination:" +
- getQueuePrefix() +
- getQueueName() +
- "\n" +
- "ack:auto\n" +
- "id: mysubid\n\n" +
- Stomp.NULL;
- sendFrame(frame);
-
- sendMessage(getName());
-
- frame = receiveFrame(10000);
- Assert.assertTrue(frame.startsWith("MESSAGE"));
- Assert.assertTrue(frame.indexOf("destination:") > 0);
- Assert.assertTrue(frame.indexOf("subscription:") > 0);
- Assert.assertTrue(frame.indexOf(getName()) > 0);
-
- frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
- sendFrame(frame);
+ conn.disconnect();
}
+ //
@Test
public void testBodyWithUTF8() throws Exception {
-
- String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
- sendFrame(frame);
-
- frame = receiveFrame(100000);
- Assert.assertTrue(frame.startsWith("CONNECTED"));
-
- frame = "SUBSCRIBE\n" + "destination:" + getQueuePrefix() + getQueueName() + "\n" + "ack:auto\n\n" + Stomp.NULL;
- sendFrame(frame);
+ conn.connect(defUser, defPass);
+ subscribe(conn, null, Stomp.Headers.Subscribe.AckModeValues.AUTO);
String text = "A" + "\u00ea" + "\u00f1" + "\u00fc" + "C";
- System.out.println(text);
- sendMessage(text);
+ log.info(text);
+ sendJmsMessage(text);
- frame = receiveFrame(10000);
- System.out.println(frame);
- Assert.assertTrue(frame.startsWith("MESSAGE"));
- Assert.assertTrue(frame.indexOf("destination:") > 0);
- Assert.assertTrue(frame.indexOf(text) > 0);
+ ClientStompFrame frame = conn.receiveFrame(10000);
+ log.info(frame);
+ Assert.assertEquals(Stomp.Responses.MESSAGE, frame.getCommand());
+ Assert.assertEquals(getQueuePrefix() + getQueueName(), frame.getHeader(Stomp.Headers.Message.DESTINATION));
+ Assert.assertEquals(text, frame.getBody());
- frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
- sendFrame(frame);
+ conn.disconnect();
}
@Test
@@ -792,88 +653,54 @@ public class StompTest extends StompTestBase {
int ctr = 10;
String[] data = new String[ctr];
- String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
- sendFrame(frame);
-
- frame = receiveFrame(100000);
- Assert.assertTrue(frame.startsWith("CONNECTED"));
-
- frame = "SUBSCRIBE\n" + "destination:" + getQueuePrefix() + getQueueName() + "\n" + "ack:auto\n\n" + Stomp.NULL;
- sendFrame(frame);
+ conn.connect(defUser, defPass);
+ subscribe(conn, null, Stomp.Headers.Subscribe.AckModeValues.AUTO);
for (int i = 0; i < ctr; ++i) {
data[i] = getName() + i;
- sendMessage(data[i]);
+ sendJmsMessage(data[i]);
}
for (int i = 0; i < ctr; ++i) {
- frame = receiveFrame(1000);
- Assert.assertTrue("Message not in order", frame.indexOf(data[i]) >= 0);
+ ClientStompFrame frame = conn.receiveFrame(1000);
+ Assert.assertTrue("Message not in order", frame.getBody().equals(data[i]));
}
// sleep a while before publishing another set of messages
- waitForFrameToTakeEffect();
+ Thread.sleep(200);
for (int i = 0; i < ctr; ++i) {
- data[i] = getName() + ":second:" + i;
- sendMessage(data[i]);
+ data[i] = getName() + Stomp.Headers.SEPARATOR + "second:" + i;
+ sendJmsMessage(data[i]);
}
for (int i = 0; i < ctr; ++i) {
- frame = receiveFrame(1000);
- Assert.assertTrue("Message not in order", frame.indexOf(data[i]) >= 0);
+ ClientStompFrame frame = conn.receiveFrame(1000);
+ Assert.assertTrue("Message not in order", frame.getBody().equals(data[i]));
}
- frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
- sendFrame(frame);
+ conn.disconnect();
}
@Test
public void testSubscribeWithAutoAckAndSelector() throws Exception {
+ conn.connect(defUser, defPass);
+ subscribe(conn, null, Stomp.Headers.Subscribe.AckModeValues.AUTO, null, "foo = 'zzz'");
- String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
- sendFrame(frame);
-
- frame = receiveFrame(100000);
- Assert.assertTrue(frame.startsWith("CONNECTED"));
-
- frame = "SUBSCRIBE\n" + "destination:" +
- getQueuePrefix() +
- getQueueName() +
- "\n" +
- "selector: foo = 'zzz'\n" +
- "ack:auto\n\n" +
- Stomp.NULL;
- sendFrame(frame);
+ sendJmsMessage("Ignored message", "foo", "1234");
+ sendJmsMessage("Real message", "foo", "zzz");
- sendMessage("Ignored message", "foo", "1234");
- sendMessage("Real message", "foo", "zzz");
+ ClientStompFrame frame = conn.receiveFrame(10000);
+ Assert.assertEquals(Stomp.Responses.MESSAGE, frame.getCommand());
+ Assert.assertTrue("Should have received the real message but got: " + frame, frame.getBody().equals("Real message"));
- frame = receiveFrame(10000);
- Assert.assertTrue(frame.startsWith("MESSAGE"));
- Assert.assertTrue("Should have received the real message but got: " + frame, frame.indexOf("Real message") > 0);
-
- frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
- sendFrame(frame);
+ conn.disconnect();
}
@Test
public void testSubscribeWithAutoAckAndHyphenatedSelector() throws Exception {
-
- String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
- sendFrame(frame);
-
- frame = receiveFrame(100000);
- Assert.assertTrue(frame.startsWith("CONNECTED"));
-
- frame = "SUBSCRIBE\n" + "destination:" +
- getQueuePrefix() +
- getQueueName() +
- "\n" +
- "selector: hyphenated_props:foo-bar = 'zzz'\n" +
- "ack:auto\n\n" +
- Stomp.NULL;
- sendFrame(frame);
+ conn.connect(defUser, defPass);
+ subscribe(conn, null, Stomp.Headers.Subscribe.AckModeValues.AUTO, null, "hyphenated_props:foo-bar = 'zzz'");
ServerLocator serverLocator = addServerLocator(ActiveMQClient.createServerLocator("vm://0"));
ClientSessionFactory clientSessionFactory = serverLocator.createSessionFactory();
@@ -891,40 +718,26 @@ public class StompTest extends StompTestBase {
producer.send(ignoredMessage);
producer.send(realMessage);
- frame = receiveFrame(10000);
- Assert.assertTrue(frame.startsWith("MESSAGE"));
- Assert.assertTrue("Should have received the real message but got: " + frame, frame.indexOf("Real message") > 0);
- frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
- sendFrame(frame);
+ ClientStompFrame frame = conn.receiveFrame(10000);
+ Assert.assertEquals(Stomp.Responses.MESSAGE, frame.getCommand());
+ Assert.assertTrue("Should have received the real message but got: " + frame, frame.getBody().equals("Real message"));
+
+ conn.disconnect();
}
@Test
public void testSubscribeWithClientAck() throws Exception {
+ conn.connect(defUser, defPass);
+ subscribe(conn, null, Stomp.Headers.Subscribe.AckModeValues.CLIENT);
- String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
- sendFrame(frame);
+ sendJmsMessage(getName());
+ ClientStompFrame frame = conn.receiveFrame(10000);
+ Assert.assertEquals(Stomp.Responses.MESSAGE, frame.getCommand());
+ Assert.assertNotNull(frame.getHeader(Stomp.Headers.Message.MESSAGE_ID));
+ ack(conn, null, frame);
- frame = receiveFrame(10000);
- Assert.assertTrue(frame.startsWith("CONNECTED"));
-
- frame = "SUBSCRIBE\n" + "destination:" + getQueuePrefix() + getQueueName() + "\n" + "ack:client\n\n" + Stomp.NULL;
-
- sendFrame(frame);
-
- sendMessage(getName());
- frame = receiveFrame(10000);
- Assert.assertTrue(frame.startsWith("MESSAGE"));
- Pattern cl = Pattern.compile("message-id:\\s*(\\S+)", Pattern.CASE_INSENSITIVE);
- Matcher cl_matcher = cl.matcher(frame);
- Assert.assertTrue(cl_matcher.find());
- String messageID = cl_matcher.group(1);
-
- frame = "ACK\n" + "message-id: " + messageID + "\n\n" + Stomp.NULL;
- sendFrame(frame);
-
- frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
- sendFrame(frame);
+ conn.disconnect();
// message should not be received since message was acknowledged by the client
MessageConsumer consumer = session.createConsumer(queue);
@@ -934,23 +747,14 @@ public class StompTest extends StompTestBase {
@Test
public void testRedeliveryWithClientAck() throws Exception {
+ conn.connect(defUser, defPass);
+ subscribe(conn, null, Stomp.Headers.Subscribe.AckModeValues.CLIENT);
- String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
- sendFrame(frame);
+ sendJmsMessage(getName());
+ ClientStompFrame frame = conn.receiveFrame(10000);
+ Assert.assertEquals(Stomp.Responses.MESSAGE, frame.getCommand());
- frame = receiveFrame(10000);
- Assert.assertTrue(frame.startsWith("CONNECTED"));
-
- frame = "SUBSCRIBE\n" + "destination:" + getQueuePrefix() + getQueueName() + "\n" + "ack:client\n\n" + Stomp.NULL;
-
- sendFrame(frame);
-
- sendMessage(getName());
- frame = receiveFrame(10000);
- Assert.assertTrue(frame.startsWith("MESSAGE"));
-
- frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
- sendFrame(frame);
+ conn.disconnect();
// message should be received since message was not acknowledged
MessageConsumer consumer = session.createConsumer(queue);
@@ -971,149 +775,131 @@ public class StompTest extends StompTestBase {
protected void assertSubscribeWithClientAckThenConsumeWithAutoAck(boolean sendDisconnect) throws Exception {
- String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
- sendFrame(frame);
-
- frame = receiveFrame(10000);
- Assert.assertTrue(frame.startsWith("CONNECTED"));
+ conn.connect(defUser, defPass);
- frame = "SUBSCRIBE\n" + "destination:" + getQueuePrefix() + getQueueName() + "\n" + "ack:client\n\n" + Stomp.NULL;
+// String frame = Stomp.Commands.SUBSCRIBE + "\n" +
+// Stomp.Headers.Subscribe.SUBSCRIPTION_TYPE + Stomp.Headers.SEPARATOR + AddressInfo.RoutingType.ANYCAST + "\n" +
+// Stomp.Headers.Send.DESTINATION + Stomp.Headers.SEPARATOR + getQueuePrefix() + getQueueName() + "\n" +
+// Stomp.Headers.Message.ACK + Stomp.Headers.SEPARATOR + "client\n\n" +
+// Stomp.NULL;
+//
+// sendFrame(frame);
+ subscribe(conn, null, Stomp.Headers.Subscribe.AckModeValues.CLIENT);
+ sendJmsMessage(getName());
- sendFrame(frame);
- sendMessage(getName());
-
- frame = receiveFrame(10000);
- Assert.assertTrue(frame.startsWith("MESSAGE"));
+// frame = receiveFrame(10000);
+// Assert.assertTrue(frame.startsWith(Stomp.Responses.MESSAGE));
+ ClientStompFrame frame = conn.receiveFrame(10000);
+ Assert.assertEquals(Stomp.Responses.MESSAGE, frame.getCommand());
log.info("Reconnecting!");
if (sendDisconnect) {
- frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
- sendFrame(frame);
- waitForFrameToTakeEffect();
- reconnect();
+ conn.disconnect();
+// reconnect();
+ conn.destroy();
+ conn = StompClientConnectionFactory.createClientConnection("1.0", hostname, port);
} else {
- reconnect(100);
- waitForFrameToTakeEffect();
+// reconnect(100);
+// waitForFrameToTakeEffect();
+ conn.destroy();
+ conn = StompClientConnectionFactory.createClientConnection("1.0", hostname, port);
}
// message should be received since message was not acknowledged
- frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
- sendFrame(frame);
-
- frame = receiveFrame(10000);
- Assert.assertTrue(frame.startsWith("CONNECTED"));
+ conn.connect(defUser, defPass);
- frame = "SUBSCRIBE\n" + "destination:" + getQueuePrefix() + getQueueName() + "\n\n" + Stomp.NULL;
+// frame = Stomp.Commands.SUBSCRIBE + "\n" +
+// Stomp.Headers.Subscribe.SUBSCRIPTION_TYPE + Stomp.Headers.SEPARATOR + AddressInfo.RoutingType.ANYCAST + "\n" +
+// Stomp.Headers.Subscribe.DESTINATION + Stomp.Headers.SEPARATOR + getQueuePrefix() + getQueueName() + "\n\n" +
+// Stomp.NULL;
+//
+// sendFrame(frame);
+ subscribe(conn, null);
- sendFrame(frame);
+// frame = receiveFrame(10000);
+// log.info(frame);
+// Assert.assertTrue(frame.startsWith(Stomp.Responses.MESSAGE));
+ frame = conn.receiveFrame(10000);
+ Assert.assertEquals(Stomp.Responses.MESSAGE, frame.getCommand());
- frame = receiveFrame(10000);
- Assert.assertTrue(frame.startsWith("MESSAGE"));
+ conn.disconnect();
+ conn.destroy();
- frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
- sendFrame(frame);
- waitForFrameToTakeEffect();
+ conn = StompClientConnectionFactory.createClientConnection("1.0", hostname, port);
// now let's make sure we don't see the message again
- reconnect();
-
- frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
- sendFrame(frame);
+// reconnect();
- frame = receiveFrame(10000);
- Assert.assertTrue(frame.startsWith("CONNECTED"));
+ conn.connect(defUser, defPass);
- frame = "SUBSCRIBE\n" + "destination:" +
- getQueuePrefix() +
- getQueueName() +
- "\n" +
- "receipt: 1234\n\n" +
- Stomp.NULL;
+// frame = Stomp.Commands.SUBSCRIBE + "\n" +
+// Stomp.Headers.Subscribe.SUBSCRIPTION_TYPE + Stomp.Headers.SEPARATOR + AddressInfo.RoutingType.ANYCAST + "\n" +
+// Stomp.Headers.Send.DESTINATION + Stomp.Headers.SEPARATOR + getQueuePrefix() + getQueueName() + "\n" +
+// Stomp.Headers.RECEIPT_REQUESTED + Stomp.Headers.SEPARATOR + " 1234\n\n" +
+// Stomp.NULL;
+//
+// sendFrame(frame);
+ subscribe(conn, null, Stomp.Headers.Subscribe.AckModeValues.AUTO, null, true);
- sendFrame(frame);
// wait for SUBSCRIBE's receipt
- frame = receiveFrame(10000);
- Assert.assertTrue(frame.startsWith("RECEIPT"));
-
- sendMessage("shouldBeNextMessage");
-
- frame = receiveFrame(10000);
- Assert.assertTrue(frame.startsWith("MESSAGE"));
- System.out.println(frame);
- Assert.assertTrue(frame.contains("shouldBeNextMessage"));
+// frame = receiveFrame(10000);
+// Assert.assertTrue(frame.startsWith(Stomp.Responses.RECEIPT));
+
+ sendJmsMessage("shouldBeNextMessage");
+
+// frame = receiveFrame(10000);
+// Assert.assertTrue(frame.startsWith(Stomp.Responses.MESSAGE));
+// log.info(frame);
+// Assert.assertTrue(frame.contains("shouldBeNextMessage"));
+ frame = conn.receiveFrame(10000);
+ Assert.assertEquals(Stomp.Responses.MESSAGE, frame.getCommand());
+ Assert.assertEquals("shouldBeNextMessage", frame.getBody());
}
+
@Test
public void testUnsubscribe() throws Exception {
-
- String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
- sendFrame(frame);
- frame = receiveFrame(100000);
- Assert.assertTrue(frame.startsWith("CONNECTED"));
-
- frame = "SUBSCRIBE\n" + "destination:" + getQueuePrefix() + getQueueName() + "\n" + "ack:auto\n\n" + Stomp.NULL;
- sendFrame(frame);
+ conn.connect(defUser, defPass);
+ subscribe(conn, null, Stomp.Headers.Subscribe.AckModeValues.AUTO);
// send a message to our queue
- sendMessage("first message");
+ sendJmsMessage("first message");
- // receive message from socket
- frame = receiveFrame(10000);
- Assert.assertTrue(frame.startsWith("MESSAGE"));
+ // receive message
+ ClientStompFrame frame = conn.receiveFrame(10000);
+ Assert.assertEquals(Stomp.Responses.MESSAGE, frame.getCommand());
// remove suscription
- frame = "UNSUBSCRIBE\n" + "destination:" +
- getQueuePrefix() +
- getQueueName() +
- "\n" +
- "receipt:567\n" +
- "\n\n" +
- Stomp.NULL;
- sendFrame(frame);
- waitForReceipt();
+ unsubscribe(conn, null, getQueuePrefix() + getQueueName(), true, false);
// send a message to our queue
- sendMessage("second message");
+ sendJmsMessage("second message");
- frame = receiveFrame(1000);
+ frame = conn.receiveFrame(1000);
log.info("Received frame: " + frame);
Assert.assertNull("No message should have been received since subscription was removed", frame);
}
@Test
public void testUnsubscribeWithID() throws Exception {
-
- String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
- sendFrame(frame);
- frame = receiveFrame(100000);
- Assert.assertTrue(frame.startsWith("CONNECTED"));
-
- frame = "SUBSCRIBE\n" + "destination:" +
- getQueuePrefix() +
- getQueueName() +
- "\n" +
- "id: mysubid\n" +
- "ack:auto\n\n" +
- Stomp.NULL;
- sendFrame(frame);
+ conn.connect(defUser, defPass);
+ subscribe(conn, "mysubid", Stomp.Headers.Subscribe.AckModeValues.AUTO);
// send a message to our queue
- sendMessage("first message");
+ sendJmsMessage("first message");
// receive message from socket
- frame = receiveFrame(10000);
- Assert.assertTrue(frame.startsWith("MESSAGE"));
+ ClientStompFrame frame = conn.receiveFrame(10000);
+ Assert.assertEquals(Stomp.Responses.MESSAGE, frame.getCommand());
// remove suscription
- frame = "UNSUBSCRIBE\n" + "id:mysubid\n" + "receipt: 345\n" + "\n\n" + Stomp.NULL;
- sendFrame(frame);
- waitForReceipt();
+ unsubscribe(conn, "mysubid", null, true, false);
// send a message to our queue
- sendMessage("second message");
+ sendJmsMessage("second message");
- frame = receiveFrame(1000);
+ frame = conn.receiveFrame(1000);
log.info("Received frame: " + frame);
Assert.assertNull("No message should have been received since subscription was removed", frame);
@@ -1122,34 +908,15 @@ public class StompTest extends StompTestBase {
@Test
public void testTransactionCommit() throws Exception {
MessageConsumer consumer = session.createConsumer(queue);
+ conn.connect(defUser, defPass);
- String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
- sendFrame(frame);
-
- String f = receiveFrame(1000);
- Assert.assertTrue(f.startsWith("CONNECTED"));
-
- frame = "BEGIN\n" + "transaction: tx1\n" + "\n\n" + Stomp.NULL;
- sendFrame(frame);
-
- frame = "SEND\n" + "destination:" +
- getQueuePrefix() +
- getQueueName() +
- "\n" +
- "transaction: tx1\n" +
- "receipt: 123\n" +
- "\n\n" +
- "Hello World" +
- Stomp.NULL;
- sendFrame(frame);
- waitForReceipt();
+ beginTransaction(conn, "tx1");
+ send(conn, getQueuePrefix() + getQueueName(), null, "Hello World", true, null, "tx1");
// check the message is not committed
assertNull(consumer.receive(100));
- frame = "COMMIT\n" + "transaction: tx1\n" + "receipt:456\n" + "\n\n" + Stomp.NULL;
- sendFrame(frame);
- waitForReceipt();
+ commitTransaction(conn, "tx1", true);
Message message = consumer.receive(1000);
Assert.assertNotNull("Should have received a message", message);
@@ -1158,49 +925,20 @@ public class StompTest extends StompTestBase {
@Test
public void testSuccessiveTransactionsWithSameID() throws Exception {
MessageConsumer consumer = session.createConsumer(queue);
-
- String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
- sendFrame(frame);
-
- String f = receiveFrame(1000);
- Assert.assertTrue(f.startsWith("CONNECTED"));
+ conn.connect(defUser, defPass);
// first tx
- frame = "BEGIN\n" + "transaction: tx1\n" + "\n\n" + Stomp.NULL;
- sendFrame(frame);
-
- frame = "SEND\n" + "destination:" +
- getQueuePrefix() +
- getQueueName() +
- "\n" +
- "transaction: tx1\n" +
- "\n\n" +
- "Hello World" +
- Stomp.NULL;
- sendFrame(frame);
-
- frame = "COMMIT\n" + "transaction: tx1\n" + "\n\n" + Stomp.NULL;
- sendFrame(frame);
+ beginTransaction(conn, "tx1");
+ send(conn, getQueuePrefix() + getQueueName(), null, "Hello World", true, null, "tx1");
+ commitTransaction(conn, "tx1");
Message message = consumer.receive(1000);
Assert.assertNotNull("Should have received a message", message);
// 2nd tx with same tx ID
- frame = "BEGIN\n" + "transaction: tx1\n" + "\n\n" + Stomp.NULL;
- sendFrame(frame);
-
- frame = "SEND\n" + "destination:" +
- getQueuePrefix() +
- getQueueName() +
- "\n" +
- "transaction: tx1\n" +
- "\n\n" +
- "Hello World" +
- Stomp.NULL;
- sendFrame(frame);
-
- frame = "COMMIT\n" + "transaction: tx1\n" + "\n\n" + Stomp.NULL;
- sendFrame(frame);
+ beginTransaction(conn, "tx1");
+ send(conn, getQueuePrefix() + getQueueName(), null, "Hello World", true, null, "tx1");
+ commitTransaction(conn, "tx1");
message = consumer.receive(1000);
Assert.assertNotNull("Should have received a message", message);
@@ -1208,67 +946,27 @@ public class StompTest extends StompTestBase {
@Test
public void testBeginSameTransactionTwice() throws Exception {
- String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
- sendFrame(frame);
-
- String f = receiveFrame(1000);
- Assert.assertTrue(f.startsWith("CONNECTED"));
-
- frame = "BEGIN\n" + "transaction: tx1\n" + "\n\n" + Stomp.NULL;
- sendFrame(frame);
-
- // begin the tx a 2nd time
- frame = "BEGIN\n" + "transaction: tx1\n" + "\n\n" + Stomp.NULL;
- sendFrame(frame);
-
- f = receiveFrame(1000);
- Assert.assertTrue(f.startsWith("ERROR"));
+ conn.connect(defUser, defPass);
+ beginTransaction(conn, "tx1");
+ beginTransaction(conn, "tx1");
+ ClientStompFrame frame = conn.receiveFrame(1000);
+ Assert.assertEquals(Stomp.Responses.ERROR, frame.getCommand());
}
@Test
public void testTransactionRollback() throws Exception {
MessageConsumer consumer = session.createConsumer(queue);
+ String txId = "tx1";
- String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
- sendFrame(frame);
-
- String f = receiveFrame(1000);
- Assert.assertTrue(f.startsWith("CONNECTED"));
-
- frame = "BEGIN\n" + "transaction: tx1\n" + "\n\n" + Stomp.NULL;
- sendFrame(frame);
-
- frame = "SEND\n" + "destination:" +
- getQueuePrefix() +
- getQueueName() +
- "\n" +
- "transaction: tx1\n" +
- "\n" +
- "first message" +
- Stomp.NULL;
- sendFrame(frame);
-
- // rollback first message
- frame = "ABORT\n" + "transaction: tx1\n" + "\n\n" + Stomp.NULL;
- sendFrame(frame);
-
- frame = "BEGIN\n" + "transaction: tx1\n" + "\n\n" + Stomp.NULL;
- sendFrame(frame);
-
- frame = "SEND\n" + "destination:" +
- getQueuePrefix() +
- getQueueName() +
- "\n" +
- "transaction: tx1\n" +
- "\n" +
- "second message" +
- Stomp.NULL;
- sendFrame(frame);
-
- frame = "COMMIT\n" + "transaction: tx1\n" + "receipt:789\n" + "\n\n" + Stomp.NULL;
- sendFrame(frame);
- waitForReceipt();
+ conn.connect(defUser, defPass);
+ beginTransaction(conn, txId);
+ send(conn, getQueuePrefix() + getQueueName(), null, "first message", true, null, txId);
+ abortTransaction(conn, txId);
+
+ beginTransaction(conn, txId);
+ send(conn, getQueuePrefix() + getQueueName(), null, "second message", true, null, txId);
+ commitTransaction(conn, txId);
// only second msg should be received since first msg was rolled back
TextMessage message = (TextMessage) consumer.receive(1000);
@@ -1280,91 +978,52 @@ public class StompTest extends StompTestBase {
public void testSubscribeToTopic() throws Exception {
final int baselineQueueCount = server.getActiveMQServer().getActiveMQServerControl().getQueueNames().length;
- String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
- sendFrame(frame);
-
- frame = receiveFrame(100000);
- Assert.assertTrue(frame.startsWith("CONNECTED"));
+ conn.connect(defUser, defPass);
- frame = "SUBSCRIBE\n" + "destination:" +
- getTopicPrefix() +
- getTopicName() +
- "\n" +
- "receipt: 12\n" +
- "\n\n" +
- Stomp.NULL;
- sendFrame(frame);
- // wait for SUBSCRIBE's receipt
- frame = receiveFrame(10000);
- Assert.assertTrue(frame.startsWith("RECEIPT"));
+ subscribeTopic(conn, null, null, null, true);
assertTrue("Subscription queue should be created here", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisfied() throws Exception {
- if (server.getActiveMQServer().getActiveMQServerControl().getQueueNames().length - baselineQueueCount == 1) {
+ int length = server.getActiveMQServer().getActiveMQServerControl().getQueueNames().length;
+ if (length - baselineQueueCount == 1) {
return true;
} else {
+ log.info("Queue count: " + (length - baselineQueueCount));
return false;
}
}
- }, TimeUnit.SECONDS.toMillis(1000), TimeUnit.MILLISECONDS.toMillis(100)));
-
- sendMessage(getName(), topic);
-
- frame = receiveFrame(10000);
- Assert.assertTrue(frame.startsWith("MESSAGE"));
- Assert.assertTrue(frame.indexOf("destination:") > 0);
- Assert.assertTrue(frame.indexOf(getName()) > 0);
-
- frame = "UNSUBSCRIBE\n" + "destination:" +
- getTopicPrefix() +
- getTopicName() +
- "\n" +
- "receipt: 1234\n" +
- "\n\n" +
- Stomp.NULL;
- sendFrame(frame);
- // wait for UNSUBSCRIBE's receipt
- frame = receiveFrame(10000);
- Assert.assertTrue(frame.startsWith("RECEIPT"));
-
- sendMessage(getName(), topic);
-
- frame = receiveFrame(1000);
+ }, TimeUnit.SECONDS.toMillis(10), TimeUnit.MILLISECONDS.toMillis(100)));
+
+ sendJmsMessage(getName(), topic);
+
+ ClientStompFrame frame = conn.receiveFrame(1000);
+ Assert.assertEquals(Stomp.Responses.MESSAGE, frame.getCommand());
+ Assert.assertEquals(getTopicPrefix() + getTopicName(), frame.getHeader(Stomp.Headers.Send.DESTINATION));
+ Assert.assertEquals(getName(), frame.getBody());
+
+ unsubscribe(conn, null, getTopicPrefix() + getTopicName(), true, false);
+
+ sendJmsMessage(getName(), topic);
+
+ frame = conn.receiveFrame(1000);
log.info("Received frame: " + frame);
Assert.assertNull("No message should have been received since subscription was removed", frame);
assertEquals("Subscription queue should be deleted", 0, server.getActiveMQServer().getActiveMQServerControl().getQueueNames().length - baselineQueueCount);
- frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
- sendFrame(frame);
+ conn.disconnect();
}
-
+ //
@Test
public void testSubscribeToQueue() throws Exception {
final int baselineQueueCount = server.getActiveMQServer().getActiveMQServerControl().getQueueNames().length;
- String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
- sendFrame(frame);
-
- frame = receiveFrame(100000);
- Assert.assertTrue(frame.startsWith("CONNECTED"));
-
- frame = "SUBSCRIBE\n" + "destination:" +
- getQueuePrefix() +
- getQueueName() +
- "\n" +
- "receipt: 12\n" +
- "\n\n" +
- Stomp.NULL;
- sendFrame(frame);
- // wait for SUBSCRIBE's receipt
- frame = receiveFrame(10000);
- Assert.assertTrue(frame.startsWith("RECEIPT"));
+ conn.connect(defUser, defPass);
+ subscribe(conn, null, null, null, true);
assertFalse("Queue should not be created here", Wait.waitFor(new Wait.Condition() {
-
@Override
public boolean isSatisfied() throws Exception {
if (server.getActiveMQServer().getActiveMQServerControl().getQueueNames().length - baselineQueueCount == 1) {
@@ -1375,65 +1034,39 @@ public class StompTest extends StompTestBase {
}
}, TimeUnit.MILLISECONDS.toMillis(1000), TimeUnit.MILLISECONDS.toMillis(100)));
- sendMessage(getName(), queue);
+ sendJmsMessage(getName(), queue);
- frame = receiveFrame(10000);
- Assert.assertTrue(frame.startsWith("MESSAGE"));
- Assert.assertTrue(frame.indexOf("destination:") > 0);
- Assert.assertTrue(frame.indexOf(getName()) > 0);
+ ClientStompFrame frame = conn.receiveFrame(1000);
+ Assert.assertEquals(Stomp.Responses.MESSAGE, frame.getCommand());
+ Assert.assertEquals(getQueuePrefix() + getQueueName(), frame.getHeader(Stomp.Headers.Send.DESTINATION));
+ Assert.assertEquals(getName(), frame.getBody());
- frame = "UNSUBSCRIBE\n" + "destination:" +
- getQueuePrefix() +
- getQueueName() +
- "\n" +
- "receipt: 1234\n" +
- "\n\n" +
- Stomp.NULL;
- sendFrame(frame);
- // wait for UNSUBSCRIBE's receipt
- frame = receiveFrame(10000);
- Assert.assertTrue(frame.startsWith("RECEIPT"));
+ unsubscribe(conn, null, getQueuePrefix() + getQueueName(), true, false);
- sendMessage(getName(), queue);
+ sendJmsMessage(getName(), queue);
- frame = receiveFrame(1000);
+ frame = conn.receiveFrame(1000);
log.info("Received frame: " + frame);
Assert.assertNull("No message should have been received since subscription was removed", frame);
assertEquals("Subscription queue should not be deleted", baselineQueueCount, server.getActiveMQServer().getActiveMQServerControl().getQueueNames().length);
- frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
- sendFrame(frame);
+ conn.disconnect();
}
@Test
public void testSubscribeToNonExistentQueue() throws Exception {
String nonExistentQueue = RandomUtil.randomString();
- String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
- sendFrame(frame);
-
- frame = receiveFrame(100000);
- Assert.assertTrue(frame.startsWith("CONNECTED"));
+ conn.connect(defUser, defPass);
+ subscribe(conn, null, null, null, null, getQueuePrefix() + nonExistentQueue, true);
- frame = "SUBSCRIBE\n" + "destination:" +
- getQueuePrefix() +
- nonExistentQueue +
- "\n" +
- "receipt: 12\n" +
- "\n\n" +
- Stomp.NULL;
- sendFrame(frame);
- // wait for SUBSCRIBE's receipt
- frame = receiveFrame(10000);
- Assert.assertTrue(frame.startsWith("RECEIPT"));
-
- sendMessage(getName(), ActiveMQJMSClient.createQueue(nonExistentQueue));
+ sendJmsMessage(getName(), ActiveMQJMSClient.createQueue(nonExistentQueue));
- frame = receiveFrame(10000);
- Assert.assertTrue(frame.startsWith("MESSAGE"));
- Assert.assertTrue(frame.indexOf("destination:") > 0);
- Assert.assertTrue(frame.indexOf(getName()) > 0);
+ ClientStompFrame frame = conn.receiveFrame(1000);
+ Assert.assertEquals(Stomp.Responses.MESSAGE, frame.getCommand());
+ Assert.assertEquals(getQueuePrefix() + nonExistentQueue, frame.getHeader(Stomp.Headers.Send.DESTINATION));
+ Assert.assertEquals(getName(), frame.getBody());
assertNotNull(server.getActiveMQServer().getPostOffice().getBinding(new SimpleString(nonExistentQueue)));
@@ -1449,322 +1082,160 @@ public class StompTest extends StompTestBase {
}
}, 1000, 50));
- frame = "UNSUBSCRIBE\n" + "destination:" +
- getQueuePrefix() +
- nonExistentQueue +
- "\n" +
- "receipt: 1234\n" +
- "\n\n" +
- Stomp.NULL;
- sendFrame(frame);
- // wait for UNSUBSCRIBE's receipt
- frame = receiveFrame(10000);
- Assert.assertTrue(frame.startsWith("RECEIPT"));
+ unsubscribe(conn, null, getQueuePrefix() + nonExistentQueue, true, false);
assertNull(server.getActiveMQServer().getPostOffice().getBinding(new SimpleString(nonExistentQueue)));
- sendMessage(getName(), ActiveMQJMSClient.createQueue(nonExistentQueue));
+ sendJmsMessage(getName(), ActiveMQJMSClient.createQueue(nonExistentQueue));
- frame = receiveFrame(1000);
+ frame = conn.receiveFrame(1000);
log.info("Received frame: " + frame);
Assert.assertNull("No message should have been received since subscription was removed", frame);
- frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
- sendFrame(frame);
+ conn.disconnect();
}
@Test
public void testDurableSubscriberWithReconnection() throws Exception {
+ conn.connect(defUser, defPass, "myclientid");
+ subscribeTopic(conn, null, null, getName());
+
+ conn.disconnect();
- String connectFame = "CONNECT\n" + "login: brianm\n" +
- "passcode: wombats\n" +
- "client-id: myclientid\n\n" +
- Stomp.NULL;
- sendFrame(connectFame);
-
- String frame = receiveFrame(100000);
- Assert.assertTrue(frame.startsWith("CONNECTED"));
-
- String subscribeFrame = "SUBSCRIBE\n" + "destination:" +
- getTopicPrefix() +
- getTopicName() +
- "\n" +
- "durable-subscriber-name: " +
- getName() +
- "\n" +
- "\n\n" +
- Stomp.NULL;
- sendFrame(subscribeFrame);
- waitForFrameToTakeEffect();
-
- String disconnectFrame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
- sendFrame(disconnectFrame);
- waitForFrameToTakeEffect();
+ Thread.sleep(500);
// send the message when the durable subscriber is disconnected
- sendMessage(getName(), topic);
-
- reconnect(100);
- sendFrame(connectFame);
- frame = receiveFrame(100000);
- Assert.assertTrue(frame.startsWith("CONNECTED"));
-
- sendFrame(subscribeFrame);
-
- // we must have received the message
- frame = receiveFrame(10000);
- Assert.assertTrue(frame.startsWith("MESSAGE"));
- Assert.assertTrue(frame.indexOf("destination:") > 0);
- Assert.assertTrue(frame.indexOf(getName()) > 0);
-
- String unsubscribeFrame = "UNSUBSCRIBE\n" + "destination:" +
- getTopicPrefix() +
- getTopicName() +
- "\n" +
- "receipt: 1234\n" +
- "\n\n" +
- Stomp.NULL;
- sendFrame(unsubscribeFrame);
- // wait for UNSUBSCRIBE's receipt
- frame = receiveFrame(10000);
- Assert.assertTrue(frame.startsWith("RECEIPT"));
-
- sendFrame(disconnectFrame);
+ sendJmsMessage(getName(), topic);
+
+ conn.destroy();
+ conn = StompClientConnectionFactory.createClientConnection("1.0", hostname, port);
+ conn.connect(defUser, defPass, "myclientid");
+
+ subscribeTopic(conn, null, null, getName());
+
+ ClientStompFrame frame = conn.receiveFrame(3000);
+ assertNotNull("Should have received a message from the durable subscription", frame);
+ Assert.assertEquals(Stomp.Responses.MESSAGE, frame.getCommand());
+ Assert.assertEquals(getTopicPrefix() + getTopicName(), frame.getHeader(Stomp.Headers.Send.DESTINATION));
+ Assert.assertEquals(getName(), frame.getBody());
+
+ unsubscribe(conn, null, getTopicPrefix() + getTopicName(), true, true);
+
+ conn.disconnect();
}
@Test
public void testDurableSubscriber() throws Exception {
-
- String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n" + "client-id: myclientid\n\n" + Stomp.NULL;
- sendFrame(frame);
-
- frame = receiveFrame(100000);
- Assert.assertTrue(frame.startsWith("CONNECTED"));
-
- String subscribeFrame = "SUBSCRIBE\n" + "destination:" +
- getTopicPrefix() +
- getTopicName() +
- "\n" +
- "receipt: 12\n" +
- "durable-subscriber-name: " +
- getName() +
- "\n" +
- "\n\n" +
- Stomp.NULL;
- sendFrame(subscribeFrame);
- // wait for SUBSCRIBE's receipt
- frame = receiveFrame(10000);
- Assert.assertTrue(frame.startsWith("RECEIPT"));
+ conn.connect(defUser, defPass, "myclientid");
+ subscribeTopic(conn, null, null, getName(), true);
+ ClientStompFrame response = subscribeTopic(conn, null, null, getName(), true);
// creating a subscriber with the same durable-subscriber-name must fail
- sendFrame(subscribeFrame);
- frame = receiveFrame(10000);
- Assert.assertTrue(frame.startsWith("ERROR"));
+ Assert.assertEquals(Stomp.Responses.ERROR, response.getCommand());
- frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
- sendFrame(frame);
+ conn.disconnect();
}
@Test
public void testDurableUnSubscribe() throws Exception {
- String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n" + "client-id: myclientid\n\n" + Stomp.NULL;
- sendFrame(frame);
-
- frame = receiveFrame(1000);
- Assert.assertTrue(frame.startsWith("CONNECTED"));
-
- String subscribeFrame = "SUBSCRIBE\n" + "destination:" +
- getTopicPrefix() +
- getTopicName() +
- "\n" +
- "receipt: 12\n" +
- "durable-subscriber-name: " +
- getName() +
- "\n" +
- "\n\n" +
- Stomp.NULL;
- sendFrame(subscribeFrame);
- // wait for SUBSCRIBE's receipt
- frame = receiveFrame(1000);
- Assert.assertTrue(frame.startsWith("RECEIPT"));
-
- frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
- sendFrame(frame);
- waitForFrameToTakeEffect();
+ conn.connect(defUser, defPass, "myclientid");
+ subscribeTopic(conn, null, null, getName(), true);
+ conn.disconnect();
+ Thread.sleep(500);
assertNotNull(server.getActiveMQServer().locateQueue(SimpleString.toSimpleString("myclientid." + getName())));
- reconnect(100);
- frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n" + "client-id: myclientid\n\n" + Stomp.NULL;
- sendFrame(frame);
-
- frame = receiveFrame(1000);
- Assert.assertTrue(frame.startsWith("CONNECTED"));
+ conn.destroy();
+ conn = StompClientConnectionFactory.createClientConnection("1.0", hostname, port);
- String unsubscribeFrame = "UNSUBSCRIBE\n" + "destination:" +
- getTopicPrefix() +
- getTopicName() +
- "\n" +
- "durable-subscriber-name: " +
- getName() +
- "\n" +
- "\n\n" +
- Stomp.NULL;
- sendFrame(unsubscribeFrame);
-
- frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
- sendFrame(frame);
- waitForFrameToTakeEffect();
+ conn.connect(defUser, defPass, "myclientid");
+ unsubscribe(conn, getName(), getTopicPrefix() + getTopicName(), false, true);
+ conn.disconnect();
+ Thread.sleep(500);
assertNull(server.getActiveMQServer().locateQueue(SimpleString.toSimpleString("myclientid." + getName())));
}
@Test
public void testSubscribeToTopicWithNoLocal() throws Exception {
+ conn.connect(defUser, defPass);
+ subscribeTopic(conn, null, null, null, true, true);
- String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
- sendFrame(frame);
-
- frame = receiveFrame(100000);
- Assert.assertTrue(frame.startsWith("CONNECTED"));
-
- frame = "SUBSCRIBE\n" + "destination:" +
- getTopicPrefix() +
- getTopicName() +
- "\n" +
- "receipt: 12\n" +
- "no-local: true\n" +
- "\n\n" +
- Stomp.NULL;
- sendFrame(frame);
- // wait for SUBSCRIBE's receipt
- frame = receiveFrame(10000);
- Assert.assertTrue(frame.startsWith("RECEIPT"));
-
- // send a message on the same connection => it should not be received
- frame = "SEND\n" + "destination:" + getTopicPrefix() + getTopicName() + "\n\n" + "Hello World" + Stomp.NULL;
- sendFrame(frame);
+ // send a message on the same connection => it should not be received is noLocal = true on subscribe
+ send(conn, getTopicPrefix() + getTopicName(), null, "Hello World");
- frame = receiveFrame(2000);
+ ClientStompFrame frame = conn.receiveFrame(2000);
log.info("Received frame: " + frame);
Assert.assertNull("No message should have been received since subscription was removed", frame);
// send message on another JMS connection => it should be received
- sendMessage(getName(), topic);
- frame = receiveFrame(10000);
- Assert.assertTrue(frame.startsWith("MESSAGE"));
- Assert.assertTrue(frame.indexOf("destination:") > 0);
- Assert.assertTrue(frame.indexOf(getName()) > 0);
-
- frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
- sendFrame(frame);
+ sendJmsMessage(getName(), topic);
+ frame = conn.receiveFrame(10000);
+ Assert.assertEquals(Stomp.Responses.MESSAGE, frame.getCommand());
+ Assert.assertEquals(getTopicPrefix() + getTopicName(), frame.getHeader(Stomp.Headers.Send.DESTINATION));
+ Assert.assertEquals(getName(), frame.getBody());
+
+ conn.disconnect();
}
@Test
public void testTopicExistsAfterNoUnsubscribeDisconnect() throws Exception {
+ conn.connect(defUser, defPass);
+ subscribeTopic(conn, null, null, null, true);
- String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
- sendFrame(frame);
-
- frame = receiveFrame(100000);
- Assert.assertTrue(frame.startsWith("CONNECTED"));
+ // disconnect, _without unsubscribing_
+ conn.disconnect();
- frame = "SUBSCRIBE\n" + "destination:" +
- getTopicPrefix() +
- getTopicName() +
- "\n" +
- "receipt: 12\n" +
- "\n\n" +
- Stomp.NULL;
- sendFrame(frame);
- // wait for SUBSCRIBE's receipt
- frame = receiveFrame(10000);
- Assert.assertTrue(frame.startsWith("RECEIPT"));
+ Thread.sleep(500);
- // disconnect, _without unsubscribing_
- frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
- sendFrame(frame);
- waitForFrameToTakeEffect();
+ conn.destroy();
// connect again
- reconnect();
- frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
- sendFrame(frame);
-
- frame = receiveFrame(100000);
- Assert.assertTrue(frame.startsWith("CONNECTED"));
+ conn = StompClientConnectionFactory.createClientConnection("1.0", hostname, port);
+ conn.connect(defUser, defPass);
// send a receipted message to the topic
- frame = "SEND\n" + "destination:" + getTopicPrefix() + getTopicName() + "\nreceipt:42\n\n\n" + "Hello World" + Stomp.NULL;
- sendFrame(frame);
-
- // the topic should exist and receive the message, and we should get the requested receipt
- frame = receiveFrame(2000);
- log.info("Received frame: " + frame);
- Assert.assertTrue(frame.startsWith("RECEIPT"));
+ ClientStompFrame response = send(conn, getTopicPrefix() + getTopicName(), null, "Hello World", true);
+ assertEquals(Stomp.Responses.RECEIPT, response.getCommand());
// ...and nothing else
- frame = receiveFrame(2000);
+ ClientStompFrame frame = conn.receiveFrame(2000);
log.info("Received frame: " + frame);
Assert.assertNull(frame);
- frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
- sendFrame(frame);
+ conn.disconnect();
}
@Test
public void testClientAckNotPartOfTransaction() throws Exception {
+ conn.connect(defUser, defPass);
+ subscribe(conn, null, Stomp.Headers.Subscribe.AckModeValues.CLIENT);
- String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
- sendFrame(frame);
-
- frame = receiveFrame(100000);
- Assert.assertTrue(frame.startsWith("CONNECTED"));
-
- frame = "SUBSCRIBE\n" + "destination:" +
- getQueuePrefix() +
- getQueueName() +
- "\n" +
- "ack:client\n" +
- "\n\n" +
- Stomp.NULL;
- sendFrame(frame);
-
- sendMessage(getName());
-
- frame = receiveFrame(10000);
- Assert.assertTrue(frame.startsWith("MESSAGE"));
- Assert.assertTrue(frame.indexOf("destination:") > 0);
- Assert.assertTrue(frame.indexOf(getName()) > 0);
- Assert.assertTrue(frame.indexOf("message-id:") > 0);
- Pattern cl = Pattern.compile("message-id:\\s*(\\S+)", Pattern.CASE_INSENSITIVE);
- Matcher cl_matcher = cl.matcher(frame);
- Assert.assertTrue(cl_matcher.find());
- String messageID = cl_matcher.group(1);
-
- frame = "BEGIN\n" + "transaction: tx1\n" + "\n\n" + Stomp.NULL;
- sendFrame(frame);
+ sendJmsMessage(getName());
- frame = "ACK\n" + "message-id:" + messageID + "\n" + "transaction: tx1\n" + "\n" + "second message" + Stomp.NULL;
- sendFrame(frame);
+ ClientStompFrame frame = conn.receiveFrame(10000);
+ Assert.assertEquals(Stomp.Responses.MESSAGE, frame.getCommand());
+ Assert.assertEquals(getQueuePrefix() + getQueueName(), frame.getHeader(Stomp.Headers.Send.DESTINATION));
+ String messageID = frame.getHeader(Stomp.Headers.Message.MESSAGE_ID);
+ Assert.assertNotNull(messageID);
+ Assert.assertEquals(getName(), frame.getBody());
- frame = "ABORT\n" + "transaction: tx1\n" + "\n\n" + Stomp.NULL;
- sendFrame(frame);
+ beginTransaction(conn, "tx1");
+ ack(conn, null, messageID, "tx1");
+ abortTransaction(conn, "tx1");
- frame = receiveFrame(1000);
+ frame = conn.receiveFrame(1000);
Assert.assertNull("No message should have been received as the message was acked even though the transaction has been aborted", frame);
- frame = "UNSUBSCRIBE\n" + "destination:" + getQueuePrefix() + getQueueName() + "\n\n" + Stomp.NULL;
- sendFrame(frame);
+ unsubscribe(conn, null, getQueuePrefix() + getQueueName(), false, false);
- frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
- sendFrame(frame);
+ conn.disconnect();
}
// HORNETQ-1007
@Test
public void testMultiProtocolConsumers() throws Exception {
- final int TIME_OUT = 5000;
+ final int TIME_OUT = 2000;
int count = 1000;
@@ -1773,21 +1244,8 @@ public class StompTest extends StompTestBase {
MessageConsumer consumer2 = session.createConsumer(topic);
// connect and subscribe STOMP consumer
- String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
- sendFrame(frame);
- frame = receiveFrame(TIME_OUT);
- Assert.assertTrue(frame.startsWith("CONNECTED"));
- frame = "SUBSCRIBE\n" + "destination:" +
- getTopicPrefix() +
- getTopicName() +
- "\n" +
- "receipt: 12\n" +
- "\n\n" +
- Stomp.NULL;
- sendFrame(frame);
- // wait for SUBSCRIBE's receipt
- frame = receiveFrame(TIME_OUT);
- Assert.assertTrue(frame.startsWith("RECEIPT"));
+ conn.connect(defUser, defPass);
+ subscribeTopic(conn, null, null, null, true);
MessageProducer producer = session.createProducer(topic);
TextMessage message = session.createTextMessage(getName());
@@ -1796,58 +1254,37 @@ public class StompTest extends StompTestBase {
producer.send(message);
Assert.assertNotNull(consumer1.receive(TIME_OUT));
Assert.assertNotNull(consumer2.receive(TIME_OUT));
- frame = receiveFrame(TIME_OUT);
- Assert.assertTrue(frame.startsWith("MESSAGE"));
- Assert.assertTrue(frame.indexOf("destination:") > 0);
- Assert.assertTrue(frame.indexOf(getName()) > 0);
+ ClientStompFrame frame = conn.receiveFrame(TIME_OUT);
+ Assert.assertEquals(Stomp.Responses.MESSAGE, frame.getCommand());
+ Assert.assertEquals(getTopicPrefix() + getTopicName(), frame.getHeader(Stomp.Headers.Send.DESTINATION));
+ Assert.assertEquals(getName(), frame.getBody());
}
consumer1.close();
consumer2.close();
- frame = "UNSUBSCRIBE\n" + "destination:" +
- getTopicPrefix() +
- getTopicName() +
- "\n" +
- "receipt: 1234\n" +
- "\n\n" +
- Stomp.NULL;
- sendFrame(frame);
- // wait for UNSUBSCRIBE's receipt
- frame = receiveFrame(TIME_OUT);
- Assert.assertTrue(frame.startsWith("RECEIPT"));
-
- sendMessage(getName(), topic);
-
- frame = receiveFrame(TIME_OUT);
+ unsubscribe(conn, null, getTopicPrefix() + getTopicName(), true, false);
+
+ sendJmsMessage(getName(), topic);
+
+ ClientStompFrame frame = conn.receiveFrame(TIME_OUT);
log.info("Received frame: " + frame);
Assert.assertNull("No message should have been received since subscription was removed", frame);
- frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
- sendFrame(frame);
+ conn.disconnect();
}
@Test
//stomp should return an ERROR when acking a non-existent message
public void testUnexpectedAck() throws Exception {
-
- String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
- sendFrame(frame);
-
- frame = receiveFrame(100000);
- Assert.assertTrue(frame.startsWith("CONNECTED"));
-
String messageID = "888888";
- frame = "ACK\n" + "message-id:" + messageID + "\n" + "\n" + Stomp.NULL;
- sendFrame(frame);
- frame = receiveFrame(100000);
- assertNotNull(frame);
+ conn.connect(defUser, defPass);
+ ack(conn, null, messageID, null);
- System.out.println("received frame: " + frame);
- assertTrue(frame.startsWith("ERROR"));
+ ClientStompFrame frame = conn.receiveFrame(1000);
+ assertNotNull(frame);
+ assertEquals(Stomp.Responses.ERROR, frame.getCommand());
- frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
- sendFrame(frame);
+ conn.disconnect();
}
-
}