You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2016/12/07 18:18:24 UTC
[21/55] [abbrv] activemq-artemis git commit: Stomp refactor + track
autocreation for addresses
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/58934fed/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/v11/StompV11Test.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/v11/StompV11Test.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/v11/StompV11Test.java
index 407df80..d4158ac 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/v11/StompV11Test.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/v11/StompV11Test.java
@@ -32,8 +32,9 @@ import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.protocol.stomp.Stomp;
-import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
+import org.apache.activemq.artemis.tests.integration.stomp.StompTestBase;
import org.apache.activemq.artemis.tests.integration.stomp.util.ClientStompFrame;
import org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConnection;
import org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConnectionFactory;
@@ -46,28 +47,28 @@ import org.junit.Test;
/*
*
*/
-public class StompV11Test extends StompV11TestBase {
+public class StompV11Test extends StompTestBase {
private static final transient IntegrationTestLogger log = IntegrationTestLogger.LOGGER;
public static final String CLIENT_ID = "myclientid";
- private StompClientConnection connV11;
+ private StompClientConnection conn;
@Override
@Before
public void setUp() throws Exception {
super.setUp();
- connV11 = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
+ conn = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
}
@Override
@After
public void tearDown() throws Exception {
try {
- boolean connected = connV11 != null && connV11.isConnected();
- log.debug("Connection 11 : " + connected);
+ boolean connected = conn != null && conn.isConnected();
+ log.debug("Connection 1.1 : " + connected);
if (connected) {
- connV11.disconnect();
+ conn.disconnect();
}
} finally {
super.tearDown();
@@ -115,276 +116,234 @@ public class StompV11Test extends StompV11TestBase {
conn = (StompClientConnectionV11) StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
ClientStompFrame frame = conn.connect("invaliduser", defPass);
assertFalse(conn.isConnected());
- assertTrue("ERROR".equals(frame.getCommand()));
+ assertTrue(Stomp.Responses.ERROR.equals(frame.getCommand()));
assertTrue(frame.getBody().contains("Security Error occurred"));
}
@Test
public void testNegotiation() throws Exception {
// case 1 accept-version absent. It is a 1.0 connect
- ClientStompFrame frame = connV11.createFrame("CONNECT");
- frame.addHeader("host", "127.0.0.1");
- frame.addHeader("login", this.defUser);
- frame.addHeader("passcode", this.defPass);
+ ClientStompFrame frame = conn.createFrame(Stomp.Commands.CONNECT)
+ .addHeader(Stomp.Headers.Connect.HOST, "127.0.0.1")
+ .addHeader(Stomp.Headers.Connect.LOGIN, this.defUser)
+ .addHeader(Stomp.Headers.Connect.PASSCODE, this.defPass);
- ClientStompFrame reply = connV11.sendFrame(frame);
+ ClientStompFrame reply = conn.sendFrame(frame);
- assertEquals("CONNECTED", reply.getCommand());
+ assertEquals(Stomp.Responses.CONNECTED, reply.getCommand());
//reply headers: version, session, server
assertEquals(null, reply.getHeader("version"));
- connV11.disconnect();
+ conn.disconnect();
// case 2 accept-version=1.0, result: 1.0
- connV11 = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
- frame = connV11.createFrame("CONNECT");
- frame.addHeader("accept-version", "1.0");
- frame.addHeader("host", "127.0.0.1");
- frame.addHeader("login", this.defUser);
- frame.addHeader("passcode", this.defPass);
+ conn = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
+ frame = conn.createFrame(Stomp.Commands.CONNECT)
+ .addHeader(Stomp.Headers.Connect.ACCEPT_VERSION, "1.0")
+ .addHeader(Stomp.Headers.Connect.HOST, "127.0.0.1")
+ .addHeader(Stomp.Headers.Connect.LOGIN, this.defUser)
+ .addHeader(Stomp.Headers.Connect.PASSCODE, this.defPass);
- reply = connV11.sendFrame(frame);
+ reply = conn.sendFrame(frame);
- assertEquals("CONNECTED", reply.getCommand());
+ assertEquals(Stomp.Responses.CONNECTED, reply.getCommand());
//reply headers: version, session, server
assertEquals("1.0", reply.getHeader("version"));
- connV11.disconnect();
+ conn.disconnect();
// case 3 accept-version=1.1, result: 1.1
- connV11 = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
- frame = connV11.createFrame("CONNECT");
- frame.addHeader("accept-version", "1.1");
- frame.addHeader("host", "127.0.0.1");
- frame.addHeader("login", this.defUser);
- frame.addHeader("passcode", this.defPass);
+ conn = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
+ frame = conn.createFrame(Stomp.Commands.CONNECT)
+ .addHeader(Stomp.Headers.Connect.ACCEPT_VERSION, "1.1")
+ .addHeader(Stomp.Headers.Connect.HOST, "127.0.0.1")
+ .addHeader(Stomp.Headers.Connect.LOGIN, this.defUser)
+ .addHeader(Stomp.Headers.Connect.PASSCODE, this.defPass);
- reply = connV11.sendFrame(frame);
+ reply = conn.sendFrame(frame);
- assertEquals("CONNECTED", reply.getCommand());
+ assertEquals(Stomp.Responses.CONNECTED, reply.getCommand());
//reply headers: version, session, server
assertEquals("1.1", reply.getHeader("version"));
- connV11.disconnect();
+ conn.disconnect();
// case 4 accept-version=1.0,1.1,1.2, result 1.1
- connV11 = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
- frame = connV11.createFrame("CONNECT");
- frame.addHeader("accept-version", "1.0,1.1,1.3");
- frame.addHeader("host", "127.0.0.1");
- frame.addHeader("login", this.defUser);
- frame.addHeader("passcode", this.defPass);
+ conn = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
+ frame = conn.createFrame(Stomp.Commands.CONNECT)
+ .addHeader(Stomp.Headers.Connect.ACCEPT_VERSION, "1.0,1.1,1.3")
+ .addHeader(Stomp.Headers.Connect.HOST, "127.0.0.1")
+ .addHeader(Stomp.Headers.Connect.LOGIN, this.defUser)
+ .addHeader(Stomp.Headers.Connect.PASSCODE, this.defPass);
- reply = connV11.sendFrame(frame);
+ reply = conn.sendFrame(frame);
- assertEquals("CONNECTED", reply.getCommand());
+ assertEquals(Stomp.Responses.CONNECTED, reply.getCommand());
//reply headers: version, session, server
assertEquals("1.1", reply.getHeader("version"));
- connV11.disconnect();
+ conn.disconnect();
// case 5 accept-version=1.2, result error
- connV11 = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
- frame = connV11.createFrame("CONNECT");
- frame.addHeader("accept-version", "1.3");
- frame.addHeader("host", "127.0.0.1");
- frame.addHeader("login", this.defUser);
- frame.addHeader("passcode", this.defPass);
+ conn = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
+ frame = conn.createFrame(Stomp.Commands.CONNECT)
+ .addHeader(Stomp.Headers.Connect.ACCEPT_VERSION, "1.3")
+ .addHeader(Stomp.Headers.Connect.HOST, "127.0.0.1")
+ .addHeader(Stomp.Headers.Connect.LOGIN, this.defUser)
+ .addHeader(Stomp.Headers.Connect.PASSCODE, this.defPass);
- reply = connV11.sendFrame(frame);
+ reply = conn.sendFrame(frame);
- assertEquals("ERROR", reply.getCommand());
+ assertEquals(Stomp.Responses.ERROR, reply.getCommand());
- System.out.println("Got error frame " + reply);
+ IntegrationTestLogger.LOGGER.info("Got error frame " + reply);
}
@Test
public void testSendAndReceive() throws Exception {
- connV11.connect(defUser, defPass);
- ClientStompFrame frame = connV11.createFrame("SEND");
- frame.addHeader("destination", getQueuePrefix() + getQueueName());
- frame.addHeader("content-type", "text/plain");
- frame.setBody("Hello World 1!");
+ conn.connect(defUser, defPass);
- ClientStompFrame response = connV11.sendFrame(frame);
+ ClientStompFrame response = send(conn, getQueuePrefix() + getQueueName(), "text/plain", "Hello World 1!");
assertNull(response);
- frame.addHeader("receipt", "1234");
- frame.setBody("Hello World 2!");
-
- response = connV11.sendFrame(frame);
+ String uuid = UUID.randomUUID().toString();
- assertNotNull(response);
-
- assertEquals("RECEIPT", response.getCommand());
-
- assertEquals("1234", response.getHeader("receipt-id"));
+ response = send(conn, getQueuePrefix() + getQueueName(), "text/plain", "Hello World 2!", true);
//subscribe
StompClientConnection newConn = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
newConn.connect(defUser, defPass);
- ClientStompFrame subFrame = newConn.createFrame("SUBSCRIBE");
- subFrame.addHeader("id", "a-sub");
- subFrame.addHeader("destination", getQueuePrefix() + getQueueName());
- subFrame.addHeader("ack", "auto");
+ subscribe(newConn, "a-sub");
- newConn.sendFrame(subFrame);
-
- frame = newConn.receiveFrame();
+ ClientStompFrame frame = newConn.receiveFrame();
- System.out.println("received " + frame);
+ IntegrationTestLogger.LOGGER.info("received " + frame);
- assertEquals("MESSAGE", frame.getCommand());
+ assertEquals(Stomp.Responses.MESSAGE, frame.getCommand());
- assertEquals("a-sub", frame.getHeader("subscription"));
+ assertEquals("a-sub", frame.getHeader(Stomp.Headers.Ack.SUBSCRIPTION));
- assertNotNull(frame.getHeader("message-id"));
+ assertNotNull(frame.getHeader(Stomp.Headers.Message.MESSAGE_ID));
- assertEquals(getQueuePrefix() + getQueueName(), frame.getHeader("destination"));
+ assertEquals(getQueuePrefix() + getQueueName(), frame.getHeader(Stomp.Headers.Message.DESTINATION));
assertEquals("Hello World 1!", frame.getBody());
frame = newConn.receiveFrame();
- System.out.println("received " + frame);
+ IntegrationTestLogger.LOGGER.info("received " + frame);
- //unsub
- ClientStompFrame unsubFrame = newConn.createFrame("UNSUBSCRIBE");
- unsubFrame.addHeader("id", "a-sub");
- newConn.sendFrame(unsubFrame);
+ unsubscribe(newConn, "a-sub");
newConn.disconnect();
}
@Test
public void testHeaderContentType() throws Exception {
- connV11.connect(defUser, defPass);
- ClientStompFrame frame = connV11.createFrame("SEND");
- frame.addHeader("destination", getQueuePrefix() + getQueueName());
- frame.addHeader("content-type", "application/xml");
- frame.setBody("Hello World 1!");
-
- connV11.sendFrame(frame);
+ conn.connect(defUser, defPass);
+ send(conn, getQueuePrefix() + getQueueName(), "application/xml", "Hello World 1!");
//subscribe
StompClientConnection newConn = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
newConn.connect(defUser, defPass);
- ClientStompFrame subFrame = newConn.createFrame("SUBSCRIBE");
- subFrame.addHeader("id", "a-sub");
- subFrame.addHeader("destination", getQueuePrefix() + getQueueName());
- subFrame.addHeader("ack", "auto");
+ subscribe(newConn, "a-sub");
- newConn.sendFrame(subFrame);
+ ClientStompFrame frame = newConn.receiveFrame();
- frame = newConn.receiveFrame();
-
- System.out.println("received " + frame);
+ IntegrationTestLogger.LOGGER.info("received " + frame);
- assertEquals("MESSAGE", frame.getCommand());
+ assertEquals(Stomp.Responses.MESSAGE, frame.getCommand());
- assertEquals("application/xml", frame.getHeader("content-type"));
+ assertEquals("application/xml", frame.getHeader(Stomp.Headers.CONTENT_TYPE));
//unsub
- ClientStompFrame unsubFrame = newConn.createFrame("UNSUBSCRIBE");
- unsubFrame.addHeader("id", "a-sub");
+ unsubscribe(newConn, "a-sub");
newConn.disconnect();
}
@Test
public void testHeaderContentLength() throws Exception {
- connV11.connect(defUser, defPass);
- ClientStompFrame frame = connV11.createFrame("SEND");
+ conn.connect(defUser, defPass);
String body = "Hello World 1!";
String cLen = String.valueOf(body.getBytes(StandardCharsets.UTF_8).length);
- frame.addHeader("destination", getQueuePrefix() + getQueueName());
- frame.addHeader("content-type", "application/xml");
- frame.addHeader("content-length", cLen);
- frame.setBody(body + "extra");
+ ClientStompFrame frame = conn.createFrame(Stomp.Commands.SEND)
+ .addHeader(Stomp.Headers.Send.DESTINATION, getQueuePrefix() + getQueueName())
+ .addHeader(Stomp.Headers.CONTENT_TYPE, "application/xml")
+ .addHeader(Stomp.Headers.CONTENT_LENGTH, cLen)
+ .setBody(body + "extra");
- connV11.sendFrame(frame);
+ conn.sendFrame(frame);
//subscribe
StompClientConnection newConn = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
newConn.connect(defUser, defPass);
- ClientStompFrame subFrame = newConn.createFrame("SUBSCRIBE");
- subFrame.addHeader("id", "a-sub");
- subFrame.addHeader("destination", getQueuePrefix() + getQueueName());
- subFrame.addHeader("ack", "auto");
-
- newConn.sendFrame(subFrame);
+ subscribe(newConn, "a-sub");
frame = newConn.receiveFrame();
- System.out.println("received " + frame);
+ IntegrationTestLogger.LOGGER.info("received " + frame);
- assertEquals("MESSAGE", frame.getCommand());
+ assertEquals(Stomp.Responses.MESSAGE, frame.getCommand());
- assertEquals(cLen, frame.getHeader("content-length"));
+ assertEquals(cLen, frame.getHeader(Stomp.Headers.CONTENT_LENGTH));
//unsub
- ClientStompFrame unsubFrame = newConn.createFrame("UNSUBSCRIBE");
- unsubFrame.addHeader("id", "a-sub");
+ unsubscribe(newConn, "a-sub");
newConn.disconnect();
}
@Test
public void testHeaderEncoding() throws Exception {
- connV11.connect(defUser, defPass);
- ClientStompFrame frame = connV11.createFrame("SEND");
+ conn.connect(defUser, defPass);
String body = "Hello World 1!";
String cLen = String.valueOf(body.getBytes(StandardCharsets.UTF_8).length);
-
- frame.addHeader("destination", getQueuePrefix() + getQueueName());
- frame.addHeader("content-type", "application/xml");
- frame.addHeader("content-length", cLen);
String hKey = "special-header\\\\\\n\\c";
String hVal = "\\c\\\\\\ngood";
- frame.addHeader(hKey, hVal);
- System.out.println("key: |" + hKey + "| val: |" + hVal + "|");
+ ClientStompFrame frame = conn.createFrame(Stomp.Commands.SEND)
+ .addHeader(Stomp.Headers.Send.DESTINATION, getQueuePrefix() + getQueueName())
+ .addHeader(Stomp.Headers.CONTENT_TYPE, "application/xml")
+ .addHeader(Stomp.Headers.CONTENT_LENGTH, cLen)
+ .addHeader(hKey, hVal);
+
+ IntegrationTestLogger.LOGGER.info("key: |" + hKey + "| val: |" + hVal + "|");
frame.setBody(body);
- connV11.sendFrame(frame);
+ conn.sendFrame(frame);
//subscribe
StompClientConnection newConn = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
newConn.connect(defUser, defPass);
- ClientStompFrame subFrame = newConn.createFrame("SUBSCRIBE");
- subFrame.addHeader("id", "a-sub");
- subFrame.addHeader("destination", getQueuePrefix() + getQueueName());
- subFrame.addHeader("ack", "auto");
-
- newConn.sendFrame(subFrame);
+ subscribe(newConn, "a-sub");
frame = newConn.receiveFrame();
- System.out.println("received " + frame);
+ IntegrationTestLogger.LOGGER.info("received " + frame);
- assertEquals("MESSAGE", frame.getCommand());
+ assertEquals(Stomp.Responses.MESSAGE, frame.getCommand());
String value = frame.getHeader("special-header" + "\\" + "\n" + ":");
assertEquals(":" + "\\" + "\n" + "good", value);
//unsub
- ClientStompFrame unsubFrame = newConn.createFrame("UNSUBSCRIBE");
- unsubFrame.addHeader("id", "a-sub");
+ unsubscribe(newConn, "a-sub");
newConn.disconnect();
}
@@ -394,8 +353,8 @@ public class StompV11Test extends StompV11TestBase {
*/
@Test
public void testHeaderUndefinedEscape() throws Exception {
- connV11.connect(defUser, defPass);
- ClientStompFrame frame = connV11.createFrame("SEND");
+ conn.connect(defUser, defPass);
+ ClientStompFrame frame = conn.createFrame("SEND");
String body = "Hello World 1!";
String cLen = String.valueOf(body.getBytes(StandardCharsets.UTF_8).length);
@@ -411,9 +370,9 @@ public class StompV11Test extends StompV11TestBase {
frame.setBody(body);
- connV11.sendFrame(frame);
+ conn.sendFrame(frame);
- ClientStompFrame error = connV11.receiveFrame();
+ ClientStompFrame error = conn.receiveFrame();
System.out.println("received " + error);
@@ -425,106 +384,96 @@ public class StompV11Test extends StompV11TestBase {
@Test
public void testHeartBeat() throws Exception {
//no heart beat at all if heat-beat absent
- ClientStompFrame frame = connV11.createFrame("CONNECT");
- frame.addHeader("host", "127.0.0.1");
- frame.addHeader("login", this.defUser);
- frame.addHeader("passcode", this.defPass);
+ ClientStompFrame frame = conn.createFrame(Stomp.Commands.CONNECT)
+ .addHeader(Stomp.Headers.Connect.HOST, "127.0.0.1")
+ .addHeader(Stomp.Headers.Connect.LOGIN, this.defUser)
+ .addHeader(Stomp.Headers.Connect.PASSCODE, this.defPass);
- ClientStompFrame reply = connV11.sendFrame(frame);
+ ClientStompFrame reply = conn.sendFrame(frame);
- assertEquals("CONNECTED", reply.getCommand());
+ assertEquals(Stomp.Responses.CONNECTED, reply.getCommand());
Thread.sleep(5000);
- assertEquals(0, connV11.getFrameQueueSize());
+ assertEquals(0, conn.getFrameQueueSize());
- connV11.disconnect();
+ conn.disconnect();
//default heart beat for (0,0) which is default connection TTL (60000) / default heartBeatToTtlModifier (2.0) = 30000
- connV11 = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
- frame = connV11.createFrame("CONNECT");
- frame.addHeader("host", "127.0.0.1");
- frame.addHeader("login", this.defUser);
- frame.addHeader("passcode", this.defPass);
- frame.addHeader("heart-beat", "0,0");
- frame.addHeader("accept-version", "1.0,1.1");
+ conn = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
+ frame = conn.createFrame(Stomp.Commands.CONNECT)
+ .addHeader(Stomp.Headers.Connect.HOST, "127.0.0.1")
+ .addHeader(Stomp.Headers.Connect.LOGIN, this.defUser)
+ .addHeader(Stomp.Headers.Connect.PASSCODE, this.defPass)
+ .addHeader(Stomp.Headers.Connect.HEART_BEAT, "0,0")
+ .addHeader(Stomp.Headers.Connect.ACCEPT_VERSION, "1.0,1.1");
- reply = connV11.sendFrame(frame);
+ reply = conn.sendFrame(frame);
- assertEquals("CONNECTED", reply.getCommand());
+ assertEquals(Stomp.Responses.CONNECTED, reply.getCommand());
- assertEquals("0,30000", reply.getHeader("heart-beat"));
+ assertEquals("0,30000", reply.getHeader(Stomp.Headers.Connect.HEART_BEAT));
Thread.sleep(5000);
- assertEquals(0, connV11.getFrameQueueSize());
+ assertEquals(0, conn.getFrameQueueSize());
- connV11.disconnect();
+ conn.disconnect();
//heart-beat (1,0), should receive a min client ping accepted by server
- connV11 = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
- frame = connV11.createFrame("CONNECT");
- frame.addHeader("host", "127.0.0.1");
- frame.addHeader("login", this.defUser);
- frame.addHeader("passcode", this.defPass);
- frame.addHeader("heart-beat", "1,0");
- frame.addHeader("accept-version", "1.0,1.1");
+ conn = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
+ frame = conn.createFrame(Stomp.Commands.CONNECT)
+ .addHeader(Stomp.Headers.Connect.HOST, "127.0.0.1")
+ .addHeader(Stomp.Headers.Connect.LOGIN, this.defUser)
+ .addHeader(Stomp.Headers.Connect.PASSCODE, this.defPass)
+ .addHeader(Stomp.Headers.Connect.HEART_BEAT, "1,0")
+ .addHeader(Stomp.Headers.Connect.ACCEPT_VERSION, "1.0,1.1");
- reply = connV11.sendFrame(frame);
+ reply = conn.sendFrame(frame);
- assertEquals("CONNECTED", reply.getCommand());
+ assertEquals(Stomp.Responses.CONNECTED, reply.getCommand());
- assertEquals("0,500", reply.getHeader("heart-beat"));
+ assertEquals("0,500", reply.getHeader(Stomp.Headers.Connect.HEART_BEAT));
Thread.sleep(2000);
//now server side should be disconnected because we didn't send ping for 2 sec
- frame = connV11.createFrame("SEND");
- frame.addHeader("destination", getQueuePrefix() + getQueueName());
- frame.addHeader("content-type", "text/plain");
- frame.setBody("Hello World");
-
//send will fail
try {
- connV11.sendFrame(frame);
+ send(conn, getQueuePrefix() + getQueueName(), "text/plain", "Hello World");
fail("connection should have been destroyed by now");
} catch (IOException e) {
//ignore
}
//heart-beat (1,0), start a ping, then send a message, should be ok.
- connV11 = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
- frame = connV11.createFrame("CONNECT");
- frame.addHeader("host", "127.0.0.1");
- frame.addHeader("login", this.defUser);
- frame.addHeader("passcode", this.defPass);
- frame.addHeader("heart-beat", "1,0");
- frame.addHeader("accept-version", "1.0,1.1");
+ conn = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
+ frame = conn.createFrame(Stomp.Commands.CONNECT)
+ .addHeader(Stomp.Headers.Connect.HOST, "127.0.0.1")
+ .addHeader(Stomp.Headers.Connect.LOGIN, this.defUser)
+ .addHeader(Stomp.Headers.Connect.PASSCODE, this.defPass)
+ .addHeader(Stomp.Headers.Connect.HEART_BEAT, "1,0")
+ .addHeader(Stomp.Headers.Connect.ACCEPT_VERSION, "1.0,1.1");
- reply = connV11.sendFrame(frame);
+ reply = conn.sendFrame(frame);
- assertEquals("CONNECTED", reply.getCommand());
+ assertEquals(Stomp.Responses.CONNECTED, reply.getCommand());
- assertEquals("0,500", reply.getHeader("heart-beat"));
+ assertEquals("0,500", reply.getHeader(Stomp.Headers.Connect.HEART_BEAT));
- System.out.println("========== start pinger!");
+ IntegrationTestLogger.LOGGER.info("========== start pinger!");
- connV11.startPinger(500);
+ conn.startPinger(500);
Thread.sleep(2000);
//now server side should be disconnected because we didn't send ping for 2 sec
- frame = connV11.createFrame("SEND");
- frame.addHeader("destination", getQueuePrefix() + getQueueName());
- frame.addHeader("content-type", "text/plain");
- frame.setBody("Hello World");
-
//send will be ok
- connV11.sendFrame(frame);
+ send(conn, getQueuePrefix() + getQueueName(), "text/plain", "Hello World");
- connV11.stopPinger();
+ conn.stopPinger();
- connV11.disconnect();
+ conn.disconnect();
}
@@ -532,82 +481,72 @@ public class StompV11Test extends StompV11TestBase {
@Test
public void testHeartBeat2() throws Exception {
//heart-beat (1,1)
- ClientStompFrame frame = connV11.createFrame("CONNECT");
- frame.addHeader("host", "127.0.0.1");
- frame.addHeader("login", this.defUser);
- frame.addHeader("passcode", this.defPass);
- frame.addHeader("heart-beat", "1,1");
- frame.addHeader("accept-version", "1.0,1.1");
+ ClientStompFrame frame = conn.createFrame(Stomp.Commands.CONNECT)
+ .addHeader(Stomp.Headers.Connect.HOST, "127.0.0.1")
+ .addHeader(Stomp.Headers.Connect.LOGIN, this.defUser)
+ .addHeader(Stomp.Headers.Connect.PASSCODE, this.defPass)
+ .addHeader(Stomp.Headers.Connect.HEART_BEAT, "1,1")
+ .addHeader(Stomp.Headers.Connect.ACCEPT_VERSION, "1.0,1.1");
- ClientStompFrame reply = connV11.sendFrame(frame);
+ ClientStompFrame reply = conn.sendFrame(frame);
- assertEquals("CONNECTED", reply.getCommand());
- assertEquals("500,500", reply.getHeader("heart-beat"));
+ assertEquals(Stomp.Responses.CONNECTED, reply.getCommand());
+ assertEquals("500,500", reply.getHeader(Stomp.Headers.Connect.HEART_BEAT));
- connV11.disconnect();
+ conn.disconnect();
//heart-beat (500,1000)
- connV11 = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
- frame = connV11.createFrame("CONNECT");
- frame.addHeader("host", "127.0.0.1");
- frame.addHeader("login", this.defUser);
- frame.addHeader("passcode", this.defPass);
- frame.addHeader("heart-beat", "500,1000");
- frame.addHeader("accept-version", "1.0,1.1");
+ conn = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
+ frame = conn.createFrame(Stomp.Commands.CONNECT)
+ .addHeader(Stomp.Headers.Connect.HOST, "127.0.0.1")
+ .addHeader(Stomp.Headers.Connect.LOGIN, this.defUser)
+ .addHeader(Stomp.Headers.Connect.PASSCODE, this.defPass)
+ .addHeader(Stomp.Headers.Connect.HEART_BEAT, "500,1000")
+ .addHeader(Stomp.Headers.Connect.ACCEPT_VERSION, "1.0,1.1");
- reply = connV11.sendFrame(frame);
+ reply = conn.sendFrame(frame);
- assertEquals("CONNECTED", reply.getCommand());
+ assertEquals(Stomp.Responses.CONNECTED, reply.getCommand());
- assertEquals("1000,500", reply.getHeader("heart-beat"));
+ assertEquals("1000,500", reply.getHeader(Stomp.Headers.Connect.HEART_BEAT));
- System.out.println("========== start pinger!");
+ IntegrationTestLogger.LOGGER.info("========== start pinger!");
- connV11.startPinger(500);
+ conn.startPinger(500);
Thread.sleep(10000);
//now check the frame size
- int size = connV11.getServerPingNumber();
+ int size = conn.getServerPingNumber();
- System.out.println("ping received: " + size);
+ IntegrationTestLogger.LOGGER.info("ping received: " + size);
assertTrue(size > 5);
//now server side should be disconnected because we didn't send ping for 2 sec
- frame = connV11.createFrame("SEND");
- frame.addHeader("destination", getQueuePrefix() + getQueueName());
- frame.addHeader("content-type", "text/plain");
- frame.setBody("Hello World");
-
//send will be ok
- connV11.sendFrame(frame);
+ send(conn, getQueuePrefix() + getQueueName(), "text/plain", "Hello World");
- connV11.disconnect();
+ conn.disconnect();
}
@Test
public void testSendWithHeartBeatsAndReceive() throws Exception {
StompClientConnection newConn = null;
try {
- ClientStompFrame frame = connV11.createFrame("CONNECT");
- frame.addHeader("host", "127.0.0.1");
- frame.addHeader("login", this.defUser);
- frame.addHeader("passcode", this.defPass);
- frame.addHeader("heart-beat", "500,1000");
- frame.addHeader("accept-version", "1.0,1.1");
-
- connV11.sendFrame(frame);
+ ClientStompFrame frame = conn.createFrame(Stomp.Commands.CONNECT)
+ .addHeader(Stomp.Headers.Connect.HOST, "127.0.0.1")
+ .addHeader(Stomp.Headers.Connect.LOGIN, this.defUser)
+ .addHeader(Stomp.Headers.Connect.PASSCODE, this.defPass)
+ .addHeader(Stomp.Headers.Connect.HEART_BEAT, "500,1000")
+ .addHeader(Stomp.Headers.Connect.ACCEPT_VERSION, "1.0,1.1");
- connV11.startPinger(500);
+ conn.sendFrame(frame);
- frame = connV11.createFrame("SEND");
- frame.addHeader("destination", getQueuePrefix() + getQueueName());
- frame.addHeader("content-type", "text/plain");
+ conn.startPinger(500);
for (int i = 0; i < 10; i++) {
- frame.setBody("Hello World " + i + "!");
- connV11.sendFrame(frame);
+ send(conn, getQueuePrefix() + getQueueName(), "text/plain", "Hello World " + i + "!");
Thread.sleep(500);
}
@@ -615,12 +554,7 @@ public class StompV11Test extends StompV11TestBase {
newConn = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
newConn.connect(defUser, defPass);
- ClientStompFrame subFrame = newConn.createFrame("SUBSCRIBE");
- subFrame.addHeader("id", "a-sub");
- subFrame.addHeader("destination", getQueuePrefix() + getQueueName());
- subFrame.addHeader("ack", "auto");
-
- newConn.sendFrame(subFrame);
+ subscribe(newConn, "a-sub");
int cnt = 0;
@@ -635,38 +569,32 @@ public class StompV11Test extends StompV11TestBase {
assertEquals(10, cnt);
// unsub
- ClientStompFrame unsubFrame = newConn.createFrame("UNSUBSCRIBE");
- unsubFrame.addHeader("id", "a-sub");
- newConn.sendFrame(unsubFrame);
+ unsubscribe(newConn, "a-sub");
} finally {
if (newConn != null)
newConn.disconnect();
- connV11.disconnect();
+ conn.disconnect();
}
}
@Test
public void testSendAndReceiveWithHeartBeats() throws Exception {
- connV11.connect(defUser, defPass);
- ClientStompFrame frame = connV11.createFrame("SEND");
- frame.addHeader("destination", getQueuePrefix() + getQueueName());
- frame.addHeader("content-type", "text/plain");
+ conn.connect(defUser, defPass);
for (int i = 0; i < 10; i++) {
- frame.setBody("Hello World " + i + "!");
- connV11.sendFrame(frame);
+ send(conn, getQueuePrefix() + getQueueName(), "text/plain", "Hello World " + i + "!");
Thread.sleep(500);
}
//subscribe
StompClientConnection newConn = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
try {
- frame = newConn.createFrame("CONNECT");
- frame.addHeader("host", "127.0.0.1");
- frame.addHeader("login", this.defUser);
- frame.addHeader("passcode", this.defPass);
- frame.addHeader("heart-beat", "500,1000");
- frame.addHeader("accept-version", "1.0,1.1");
+ ClientStompFrame frame = newConn.createFrame(Stomp.Commands.CONNECT)
+ .addHeader(Stomp.Headers.Connect.HOST, "127.0.0.1")
+ .addHeader(Stomp.Headers.Connect.LOGIN, this.defUser)
+ .addHeader(Stomp.Headers.Connect.PASSCODE, this.defPass)
+ .addHeader(Stomp.Headers.Connect.HEART_BEAT, "500,1000")
+ .addHeader(Stomp.Headers.Connect.ACCEPT_VERSION, "1.0,1.1");
newConn.sendFrame(frame);
@@ -674,12 +602,7 @@ public class StompV11Test extends StompV11TestBase {
Thread.sleep(500);
- ClientStompFrame subFrame = newConn.createFrame("SUBSCRIBE");
- subFrame.addHeader("id", "a-sub");
- subFrame.addHeader("destination", getQueuePrefix() + getQueueName());
- subFrame.addHeader("ack", "auto");
-
- newConn.sendFrame(subFrame);
+ subscribe(newConn, "a-sub");
int cnt = 0;
@@ -694,9 +617,7 @@ public class StompV11Test extends StompV11TestBase {
assertEquals(10, cnt);
// unsub
- ClientStompFrame unsubFrame = newConn.createFrame("UNSUBSCRIBE");
- unsubFrame.addHeader("id", "a-sub");
- newConn.sendFrame(unsubFrame);
+ unsubscribe(newConn, "a-sub");
} finally {
newConn.disconnect();
}
@@ -706,35 +627,30 @@ public class StompV11Test extends StompV11TestBase {
public void testSendWithHeartBeatsAndReceiveWithHeartBeats() throws Exception {
StompClientConnection newConn = null;
try {
- ClientStompFrame frame = connV11.createFrame("CONNECT");
- frame.addHeader("host", "127.0.0.1");
- frame.addHeader("login", this.defUser);
- frame.addHeader("passcode", this.defPass);
- frame.addHeader("heart-beat", "500,1000");
- frame.addHeader("accept-version", "1.0,1.1");
-
- connV11.sendFrame(frame);
+ ClientStompFrame frame = conn.createFrame(Stomp.Commands.CONNECT)
+ .addHeader(Stomp.Headers.Connect.HOST, "127.0.0.1")
+ .addHeader(Stomp.Headers.Connect.LOGIN, this.defUser)
+ .addHeader(Stomp.Headers.Connect.PASSCODE, this.defPass)
+ .addHeader(Stomp.Headers.Connect.HEART_BEAT, "500,1000")
+ .addHeader(Stomp.Headers.Connect.ACCEPT_VERSION, "1.0,1.1");
- connV11.startPinger(500);
+ conn.sendFrame(frame);
- frame = connV11.createFrame("SEND");
- frame.addHeader("destination", getQueuePrefix() + getQueueName());
- frame.addHeader("content-type", "text/plain");
+ conn.startPinger(500);
for (int i = 0; i < 10; i++) {
- frame.setBody("Hello World " + i + "!");
- connV11.sendFrame(frame);
+ send(conn, getQueuePrefix() + getQueueName(), "text/plain", "Hello World " + i + "!");
Thread.sleep(500);
}
// subscribe
newConn = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
- frame = newConn.createFrame("CONNECT");
- frame.addHeader("host", "127.0.0.1");
- frame.addHeader("login", this.defUser);
- frame.addHeader("passcode", this.defPass);
- frame.addHeader("heart-beat", "500,1000");
- frame.addHeader("accept-version", "1.0,1.1");
+ frame = newConn.createFrame(Stomp.Commands.CONNECT)
+ .addHeader(Stomp.Headers.Connect.HOST, "127.0.0.1")
+ .addHeader(Stomp.Headers.Connect.LOGIN, this.defUser)
+ .addHeader(Stomp.Headers.Connect.PASSCODE, this.defPass)
+ .addHeader(Stomp.Headers.Connect.HEART_BEAT, "500,1000")
+ .addHeader(Stomp.Headers.Connect.ACCEPT_VERSION, "1.0,1.1");
newConn.sendFrame(frame);
@@ -742,12 +658,7 @@ public class StompV11Test extends StompV11TestBase {
Thread.sleep(500);
- ClientStompFrame subFrame = newConn.createFrame("SUBSCRIBE");
- subFrame.addHeader("id", "a-sub");
- subFrame.addHeader("destination", getQueuePrefix() + getQueueName());
- subFrame.addHeader("ack", "auto");
-
- newConn.sendFrame(subFrame);
+ subscribe(newConn, "a-sub");
int cnt = 0;
@@ -761,13 +672,11 @@ public class StompV11Test extends StompV11TestBase {
assertEquals(10, cnt);
// unsub
- ClientStompFrame unsubFrame = newConn.createFrame("UNSUBSCRIBE");
- unsubFrame.addHeader("id", "a-sub");
- newConn.sendFrame(unsubFrame);
+ unsubscribe(newConn, "a-sub");
} finally {
if (newConn != null)
newConn.disconnect();
- connV11.disconnect();
+ conn.disconnect();
}
}
@@ -781,14 +690,14 @@ public class StompV11Test extends StompV11TestBase {
StompClientConnection connection = StompClientConnectionFactory.createClientConnection("1.1", "localhost", port);
//no heart beat at all if heat-beat absent
- frame = connection.createFrame("CONNECT");
- frame.addHeader("host", "127.0.0.1");
- frame.addHeader("login", this.defUser);
- frame.addHeader("passcode", this.defPass);
+ frame = connection.createFrame(Stomp.Commands.CONNECT)
+ .addHeader(Stomp.Headers.Connect.HOST, "127.0.0.1")
+ .addHeader(Stomp.Headers.Connect.LOGIN, this.defUser)
+ .addHeader(Stomp.Headers.Connect.PASSCODE, this.defPass);
reply = connection.sendFrame(frame);
- assertEquals("CONNECTED", reply.getCommand());
+ assertEquals(Stomp.Responses.CONNECTED, reply.getCommand());
Thread.sleep(3000);
@@ -805,20 +714,20 @@ public class StompV11Test extends StompV11TestBase {
//no heart beat for (0,0)
connection = StompClientConnectionFactory.createClientConnection("1.1", "localhost", port);
- frame = connection.createFrame("CONNECT");
- frame.addHeader("host", "127.0.0.1");
- frame.addHeader("login", this.defUser);
- frame.addHeader("passcode", this.defPass);
- frame.addHeader("heart-beat", "0,0");
- frame.addHeader("accept-version", "1.0,1.1");
+ frame = connection.createFrame(Stomp.Commands.CONNECT)
+ .addHeader(Stomp.Headers.Connect.HOST, "127.0.0.1")
+ .addHeader(Stomp.Headers.Connect.LOGIN, this.defUser)
+ .addHeader(Stomp.Headers.Connect.PASSCODE, this.defPass)
+ .addHeader(Stomp.Headers.Connect.HEART_BEAT, "0,0")
+ .addHeader(Stomp.Headers.Connect.ACCEPT_VERSION, "1.0,1.1");
reply = connection.sendFrame(frame);
IntegrationTestLogger.LOGGER.info("Reply: " + reply);
- assertEquals("CONNECTED", reply.getCommand());
+ assertEquals(Stomp.Responses.CONNECTED, reply.getCommand());
- assertEquals("0,500", reply.getHeader("heart-beat"));
+ assertEquals("0,500", reply.getHeader(Stomp.Headers.Connect.HEART_BEAT));
Thread.sleep(3000);
@@ -835,30 +744,25 @@ public class StompV11Test extends StompV11TestBase {
//heart-beat (1,0), should receive a min client ping accepted by server
connection = StompClientConnectionFactory.createClientConnection("1.1", "localhost", port);
- frame = connection.createFrame("CONNECT");
- frame.addHeader("host", "127.0.0.1");
- frame.addHeader("login", this.defUser);
- frame.addHeader("passcode", this.defPass);
- frame.addHeader("heart-beat", "1,0");
- frame.addHeader("accept-version", "1.0,1.1");
+ frame = connection.createFrame(Stomp.Commands.CONNECT)
+ .addHeader(Stomp.Headers.Connect.HOST, "127.0.0.1")
+ .addHeader(Stomp.Headers.Connect.LOGIN, this.defUser)
+ .addHeader(Stomp.Headers.Connect.PASSCODE, this.defPass)
+ .addHeader(Stomp.Headers.Connect.HEART_BEAT, "1,0")
+ .addHeader(Stomp.Headers.Connect.ACCEPT_VERSION, "1.0,1.1");
reply = connection.sendFrame(frame);
- assertEquals("CONNECTED", reply.getCommand());
+ assertEquals(Stomp.Responses.CONNECTED, reply.getCommand());
- assertEquals("0,2500", reply.getHeader("heart-beat"));
+ assertEquals("0,2500", reply.getHeader(Stomp.Headers.Connect.HEART_BEAT));
Thread.sleep(7000);
//now server side should be disconnected because we didn't send ping for 2 sec
- frame = connection.createFrame("SEND");
- frame.addHeader("destination", getQueuePrefix() + getQueueName());
- frame.addHeader("content-type", "text/plain");
- frame.setBody("Hello World");
-
//send will fail
try {
- connection.sendFrame(frame);
+ send(connection, getQueuePrefix() + getQueueName(), "text/plain", "Hello World");
fail("connection should have been destroyed by now");
} catch (IOException e) {
//ignore
@@ -866,33 +770,28 @@ public class StompV11Test extends StompV11TestBase {
//heart-beat (1,0), start a ping, then send a message, should be ok.
connection = StompClientConnectionFactory.createClientConnection("1.1", "localhost", port);
- frame = connection.createFrame("CONNECT");
- frame.addHeader("host", "127.0.0.1");
- frame.addHeader("login", this.defUser);
- frame.addHeader("passcode", this.defPass);
- frame.addHeader("heart-beat", "1,0");
- frame.addHeader("accept-version", "1.0,1.1");
+ frame = connection.createFrame(Stomp.Commands.CONNECT)
+ .addHeader(Stomp.Headers.Connect.HOST, "127.0.0.1")
+ .addHeader(Stomp.Headers.Connect.LOGIN, this.defUser)
+ .addHeader(Stomp.Headers.Connect.PASSCODE, this.defPass)
+ .addHeader(Stomp.Headers.Connect.HEART_BEAT, "1,0")
+ .addHeader(Stomp.Headers.Connect.ACCEPT_VERSION, "1.0,1.1");
reply = connection.sendFrame(frame);
- assertEquals("CONNECTED", reply.getCommand());
+ assertEquals(Stomp.Responses.CONNECTED, reply.getCommand());
- assertEquals("0,2500", reply.getHeader("heart-beat"));
+ assertEquals("0,2500", reply.getHeader(Stomp.Headers.Connect.HEART_BEAT));
- System.out.println("========== start pinger!");
+ IntegrationTestLogger.LOGGER.info("========== start pinger!");
connection.startPinger(2500);
Thread.sleep(7000);
//now server side should be disconnected because we didn't send ping for 2 sec
- frame = connection.createFrame("SEND");
- frame.addHeader("destination", getQueuePrefix() + getQueueName());
- frame.addHeader("content-type", "text/plain");
- frame.setBody("Hello World");
-
//send will be ok
- connection.sendFrame(frame);
+ send(connection, getQueuePrefix() + getQueueName(), "text/plain", "Hello World");
connection.stopPinger();
@@ -900,30 +799,25 @@ public class StompV11Test extends StompV11TestBase {
//heart-beat (20000,0), should receive a max client ping accepted by server
connection = StompClientConnectionFactory.createClientConnection("1.1", "localhost", port);
- frame = connection.createFrame("CONNECT");
- frame.addHeader("host", "127.0.0.1");
- frame.addHeader("login", this.defUser);
- frame.addHeader("passcode", this.defPass);
- frame.addHeader("heart-beat", "20000,0");
- frame.addHeader("accept-version", "1.0,1.1");
+ frame = connection.createFrame(Stomp.Commands.CONNECT)
+ .addHeader(Stomp.Headers.Connect.HOST, "127.0.0.1")
+ .addHeader(Stomp.Headers.Connect.LOGIN, this.defUser)
+ .addHeader(Stomp.Headers.Connect.PASSCODE, this.defPass)
+ .addHeader(Stomp.Headers.Connect.HEART_BEAT, "20000,0")
+ .addHeader(Stomp.Headers.Connect.ACCEPT_VERSION, "1.0,1.1");
reply = connection.sendFrame(frame);
- assertEquals("CONNECTED", reply.getCommand());
+ assertEquals(Stomp.Responses.CONNECTED, reply.getCommand());
- assertEquals("0,5000", reply.getHeader("heart-beat"));
+ assertEquals("0,5000", reply.getHeader(Stomp.Headers.Connect.HEART_BEAT));
Thread.sleep(12000);
//now server side should be disconnected because we didn't send ping for 2 sec
- frame = connection.createFrame("SEND");
- frame.addHeader("destination", getQueuePrefix() + getQueueName());
- frame.addHeader("content-type", "text/plain");
- frame.setBody("Hello World");
-
//send will fail
try {
- connection.sendFrame(frame);
+ send(connection, getQueuePrefix() + getQueueName(), "text/plain", "Hello World");
fail("connection should have been destroyed by now");
} catch (IOException e) {
//ignore
@@ -940,18 +834,18 @@ public class StompV11Test extends StompV11TestBase {
server.getActiveMQServer().getRemotingService().createAcceptor("test", "tcp://127.0.0.1:" + port + "?heartBeatToConnectionTtlModifier=1").start();
connection = StompClientConnectionFactory.createClientConnection("1.1", "localhost", port);
- frame = connection.createFrame("CONNECT");
- frame.addHeader("host", "127.0.0.1");
- frame.addHeader("login", this.defUser);
- frame.addHeader("passcode", this.defPass);
- frame.addHeader("heart-beat", "5000,0");
- frame.addHeader("accept-version", "1.0,1.1");
+ frame = connection.createFrame(Stomp.Commands.CONNECT)
+ .addHeader(Stomp.Headers.Connect.HOST, "127.0.0.1")
+ .addHeader(Stomp.Headers.Connect.LOGIN, this.defUser)
+ .addHeader(Stomp.Headers.Connect.PASSCODE, this.defPass)
+ .addHeader(Stomp.Headers.Connect.HEART_BEAT, "5000,0")
+ .addHeader(Stomp.Headers.Connect.ACCEPT_VERSION, "1.0,1.1");
reply = connection.sendFrame(frame);
- assertEquals("CONNECTED", reply.getCommand());
+ assertEquals(Stomp.Responses.CONNECTED, reply.getCommand());
- assertEquals("0,5000", reply.getHeader("heart-beat"));
+ assertEquals("0,5000", reply.getHeader(Stomp.Headers.Connect.HEART_BEAT));
Thread.sleep(6000);
@@ -966,18 +860,18 @@ public class StompV11Test extends StompV11TestBase {
server.getActiveMQServer().getRemotingService().createAcceptor("test", "tcp://127.0.0.1:" + port + "?heartBeatToConnectionTtlModifier=1.5").start();
connection = StompClientConnectionFactory.createClientConnection("1.1", "localhost", port);
- frame = connection.createFrame("CONNECT");
- frame.addHeader("host", "127.0.0.1");
- frame.addHeader("login", this.defUser);
- frame.addHeader("passcode", this.defPass);
- frame.addHeader("heart-beat", "5000,0");
- frame.addHeader("accept-version", "1.0,1.1");
+ frame = connection.createFrame(Stomp.Commands.CONNECT)
+ .addHeader(Stomp.Headers.Connect.HOST, "127.0.0.1")
+ .addHeader(Stomp.Headers.Connect.LOGIN, this.defUser)
+ .addHeader(Stomp.Headers.Connect.PASSCODE, this.defPass)
+ .addHeader(Stomp.Headers.Connect.HEART_BEAT, "5000,0")
+ .addHeader(Stomp.Headers.Connect.ACCEPT_VERSION, "1.0,1.1");
reply = connection.sendFrame(frame);
- assertEquals("CONNECTED", reply.getCommand());
+ assertEquals(Stomp.Responses.CONNECTED, reply.getCommand());
- assertEquals("0,5000", reply.getHeader("heart-beat"));
+ assertEquals("0,5000", reply.getHeader(Stomp.Headers.Connect.HEART_BEAT));
Thread.sleep(6000);
@@ -986,21 +880,21 @@ public class StompV11Test extends StompV11TestBase {
@Test
public void testNack() throws Exception {
- connV11.connect(defUser, defPass);
+ conn.connect(defUser, defPass);
- subscribe(connV11, "sub1", "client");
+ subscribe(conn, "sub1", Stomp.Headers.Subscribe.AckModeValues.CLIENT);
- sendMessage(getName());
+ sendJmsMessage(getName());
- ClientStompFrame frame = connV11.receiveFrame();
+ ClientStompFrame frame = conn.receiveFrame();
- String messageID = frame.getHeader("message-id");
+ String messageID = frame.getHeader(Stomp.Headers.Message.MESSAGE_ID);
- nack(connV11, "sub1", messageID);
+ nack(conn, "sub1", messageID);
- unsubscribe(connV11, "sub1");
+ unsubscribe(conn, "sub1");
- connV11.disconnect();
+ conn.disconnect();
//Nack makes the message be dropped.
MessageConsumer consumer = session.createConsumer(queue);
@@ -1010,25 +904,25 @@ public class StompV11Test extends StompV11TestBase {
@Test
public void testNackWithWrongSubId() throws Exception {
- connV11.connect(defUser, defPass);
+ conn.connect(defUser, defPass);
- subscribe(connV11, "sub1", "client");
+ subscribe(conn, "sub1", Stomp.Headers.Subscribe.AckModeValues.CLIENT);
- sendMessage(getName());
+ sendJmsMessage(getName());
- ClientStompFrame frame = connV11.receiveFrame();
+ ClientStompFrame frame = conn.receiveFrame();
- String messageID = frame.getHeader("message-id");
+ String messageID = frame.getHeader(Stomp.Headers.Message.MESSAGE_ID);
- nack(connV11, "sub2", messageID);
+ nack(conn, "sub2", messageID);
- ClientStompFrame error = connV11.receiveFrame();
+ ClientStompFrame error = conn.receiveFrame();
- System.out.println("Receiver error: " + error);
+ IntegrationTestLogger.LOGGER.info("Receiver error: " + error);
- unsubscribe(connV11, "sub1");
+ unsubscribe(conn, "sub1");
- connV11.disconnect();
+ conn.disconnect();
//message should be still there
MessageConsumer consumer = session.createConsumer(queue);
@@ -1038,25 +932,25 @@ public class StompV11Test extends StompV11TestBase {
@Test
public void testNackWithWrongMessageId() throws Exception {
- connV11.connect(defUser, defPass);
+ conn.connect(defUser, defPass);
- subscribe(connV11, "sub1", "client");
+ subscribe(conn, "sub1", Stomp.Headers.Subscribe.AckModeValues.CLIENT);
- sendMessage(getName());
+ sendJmsMessage(getName());
- ClientStompFrame frame = connV11.receiveFrame();
+ ClientStompFrame frame = conn.receiveFrame();
- frame.getHeader("message-id");
+ frame.getHeader(Stomp.Headers.Message.MESSAGE_ID);
- nack(connV11, "sub2", "someother");
+ nack(conn, "sub2", "someother");
- ClientStompFrame error = connV11.receiveFrame();
+ ClientStompFrame error = conn.receiveFrame();
- System.out.println("Receiver error: " + error);
+ IntegrationTestLogger.LOGGER.info("Receiver error: " + error);
- unsubscribe(connV11, "sub1");
+ unsubscribe(conn, "sub1");
- connV11.disconnect();
+ conn.disconnect();
//message should still there
MessageConsumer consumer = session.createConsumer(queue);
@@ -1066,21 +960,21 @@ public class StompV11Test extends StompV11TestBase {
@Test
public void testAck() throws Exception {
- connV11.connect(defUser, defPass);
+ conn.connect(defUser, defPass);
- subscribe(connV11, "sub1", "client");
+ subscribe(conn, "sub1", Stomp.Headers.Subscribe.AckModeValues.CLIENT);
- sendMessage(getName());
+ sendJmsMessage(getName());
- ClientStompFrame frame = connV11.receiveFrame();
+ ClientStompFrame frame = conn.receiveFrame();
- String messageID = frame.getHeader("message-id");
+ String messageID = frame.getHeader(Stomp.Headers.Message.MESSAGE_ID);
- ack(connV11, "sub1", messageID, null);
+ ack(conn, "sub1", messageID, null);
- unsubscribe(connV11, "sub1");
+ unsubscribe(conn, "sub1");
- connV11.disconnect();
+ conn.disconnect();
//Nack makes the message be dropped.
MessageConsumer consumer = session.createConsumer(queue);
@@ -1090,25 +984,25 @@ public class StompV11Test extends StompV11TestBase {
@Test
public void testAckWithWrongSubId() throws Exception {
- connV11.connect(defUser, defPass);
+ conn.connect(defUser, defPass);
- subscribe(connV11, "sub1", "client");
+ subscribe(conn, "sub1", Stomp.Headers.Subscribe.AckModeValues.CLIENT);
- sendMessage(getName());
+ sendJmsMessage(getName());
- ClientStompFrame frame = connV11.receiveFrame();
+ ClientStompFrame frame = conn.receiveFrame();
- String messageID = frame.getHeader("message-id");
+ String messageID = frame.getHeader(Stomp.Headers.Message.MESSAGE_ID);
- ack(connV11, "sub2", messageID, null);
+ ack(conn, "sub2", messageID, null);
- ClientStompFrame error = connV11.receiveFrame();
+ ClientStompFrame error = conn.receiveFrame();
- System.out.println("Receiver error: " + error);
+ IntegrationTestLogger.LOGGER.info("Receiver error: " + error);
- unsubscribe(connV11, "sub1");
+ unsubscribe(conn, "sub1");
- connV11.disconnect();
+ conn.disconnect();
//message should be still there
MessageConsumer consumer = session.createConsumer(queue);
@@ -1118,25 +1012,25 @@ public class StompV11Test extends StompV11TestBase {
@Test
public void testAckWithWrongMessageId() throws Exception {
- connV11.connect(defUser, defPass);
+ conn.connect(defUser, defPass);
- subscribe(connV11, "sub1", "client");
+ subscribe(conn, "sub1", Stomp.Headers.Subscribe.AckModeValues.CLIENT);
- sendMessage(getName());
+ sendJmsMessage(getName());
- ClientStompFrame frame = connV11.receiveFrame();
+ ClientStompFrame frame = conn.receiveFrame();
- frame.getHeader("message-id");
+ frame.getHeader(Stomp.Headers.Message.MESSAGE_ID);
- ack(connV11, "sub2", "someother", null);
+ ack(conn, "sub2", "someother", null);
- ClientStompFrame error = connV11.receiveFrame();
+ ClientStompFrame error = conn.receiveFrame();
- System.out.println("Receiver error: " + error);
+ IntegrationTestLogger.LOGGER.info("Receiver error: " + error);
- unsubscribe(connV11, "sub1");
+ unsubscribe(conn, "sub1");
- connV11.disconnect();
+ conn.disconnect();
//message should still there
MessageConsumer consumer = session.createConsumer(queue);
@@ -1146,33 +1040,33 @@ public class StompV11Test extends StompV11TestBase {
@Test
public void testErrorWithReceipt() throws Exception {
- connV11.connect(defUser, defPass);
+ conn.connect(defUser, defPass);
- subscribe(connV11, "sub1", "client");
+ subscribe(conn, "sub1", Stomp.Headers.Subscribe.AckModeValues.CLIENT);
- sendMessage(getName());
+ sendJmsMessage(getName());
- ClientStompFrame frame = connV11.receiveFrame();
+ ClientStompFrame frame = conn.receiveFrame();
- String messageID = frame.getHeader("message-id");
+ String messageID = frame.getHeader(Stomp.Headers.Message.MESSAGE_ID);
- ClientStompFrame ackFrame = connV11.createFrame("ACK");
//give it a wrong sub id
- ackFrame.addHeader("subscription", "sub2");
- ackFrame.addHeader("message-id", messageID);
- ackFrame.addHeader("receipt", "answer-me");
+ ClientStompFrame ackFrame = conn.createFrame(Stomp.Commands.ACK)
+ .addHeader(Stomp.Headers.Ack.SUBSCRIPTION, "sub2")
+ .addHeader(Stomp.Headers.Message.MESSAGE_ID, messageID)
+ .addHeader(Stomp.Headers.RECEIPT_REQUESTED, "answer-me");
- ClientStompFrame error = connV11.sendFrame(ackFrame);
+ ClientStompFrame error = conn.sendFrame(ackFrame);
- System.out.println("Receiver error: " + error);
+ IntegrationTestLogger.LOGGER.info("Receiver error: " + error);
- assertEquals("ERROR", error.getCommand());
+ assertEquals(Stomp.Responses.ERROR, error.getCommand());
- assertEquals("answer-me", error.getHeader("receipt-id"));
+ assertEquals("answer-me", error.getHeader(Stomp.Headers.Response.RECEIPT_ID));
- unsubscribe(connV11, "sub1");
+ unsubscribe(conn, "sub1");
- connV11.disconnect();
+ conn.disconnect();
//message should still there
MessageConsumer consumer = session.createConsumer(queue);
@@ -1182,33 +1076,33 @@ public class StompV11Test extends StompV11TestBase {
@Test
public void testErrorWithReceipt2() throws Exception {
- connV11.connect(defUser, defPass);
+ conn.connect(defUser, defPass);
- subscribe(connV11, "sub1", "client");
+ subscribe(conn, "sub1", Stomp.Headers.Subscribe.AckModeValues.CLIENT);
- sendMessage(getName());
+ sendJmsMessage(getName());
- ClientStompFrame frame = connV11.receiveFrame();
+ ClientStompFrame frame = conn.receiveFrame();
- String messageID = frame.getHeader("message-id");
+ String messageID = frame.getHeader(Stomp.Headers.Message.MESSAGE_ID);
- ClientStompFrame ackFrame = connV11.createFrame("ACK");
//give it a wrong sub id
- ackFrame.addHeader("subscription", "sub1");
- ackFrame.addHeader("message-id", String.valueOf(Long.valueOf(messageID) + 1));
- ackFrame.addHeader("receipt", "answer-me");
+ ClientStompFrame ackFrame = conn.createFrame(Stomp.Commands.ACK)
+ .addHeader(Stomp.Headers.Ack.SUBSCRIPTION, "sub1")
+ .addHeader(Stomp.Headers.Message.MESSAGE_ID, String.valueOf(Long.valueOf(messageID) + 1))
+ .addHeader(Stomp.Headers.RECEIPT_REQUESTED, "answer-me");
- ClientStompFrame error = connV11.sendFrame(ackFrame);
+ ClientStompFrame error = conn.sendFrame(ackFrame);
- System.out.println("Receiver error: " + error);
+ IntegrationTestLogger.LOGGER.info("Receiver error: " + error);
- assertEquals("ERROR", error.getCommand());
+ assertEquals(Stomp.Responses.ERROR, error.getCommand());
- assertEquals("answer-me", error.getHeader("receipt-id"));
+ assertEquals("answer-me", error.getHeader(Stomp.Headers.Response.RECEIPT_ID));
- unsubscribe(connV11, "sub1");
+ unsubscribe(conn, "sub1");
- connV11.disconnect();
+ conn.disconnect();
//message should still there
MessageConsumer consumer = session.createConsumer(queue);
@@ -1218,29 +1112,29 @@ public class StompV11Test extends StompV11TestBase {
@Test
public void testAckModeClient() throws Exception {
- connV11.connect(defUser, defPass);
+ conn.connect(defUser, defPass);
- subscribe(connV11, "sub1", "client");
+ subscribe(conn, "sub1", Stomp.Headers.Subscribe.AckModeValues.CLIENT);
int num = 50;
//send a bunch of messages
for (int i = 0; i < num; i++) {
- this.sendMessage("client-ack" + i);
+ this.sendJmsMessage("client-ack" + i);
}
ClientStompFrame frame = null;
for (int i = 0; i < num; i++) {
- frame = connV11.receiveFrame();
+ frame = conn.receiveFrame();
assertNotNull(frame);
}
//ack the last
- this.ack(connV11, "sub1", frame);
+ this.ack(conn, "sub1", frame);
- unsubscribe(connV11, "sub1");
+ unsubscribe(conn, "sub1");
- connV11.disconnect();
+ conn.disconnect();
//no messages can be received.
MessageConsumer consumer = session.createConsumer(queue);
@@ -1250,31 +1144,31 @@ public class StompV11Test extends StompV11TestBase {
@Test
public void testAckModeClient2() throws Exception {
- connV11.connect(defUser, defPass);
+ conn.connect(defUser, defPass);
- subscribe(connV11, "sub1", "client");
+ subscribe(conn, "sub1", Stomp.Headers.Subscribe.AckModeValues.CLIENT);
int num = 50;
//send a bunch of messages
for (int i = 0; i < num; i++) {
- this.sendMessage("client-ack" + i);
+ this.sendJmsMessage("client-ack" + i);
}
ClientStompFrame frame = null;
for (int i = 0; i < num; i++) {
- frame = connV11.receiveFrame();
+ frame = conn.receiveFrame();
assertNotNull(frame);
//ack the 49th
if (i == num - 2) {
- this.ack(connV11, "sub1", frame);
+ this.ack(conn, "sub1", frame);
}
}
- unsubscribe(connV11, "sub1");
+ unsubscribe(conn, "sub1");
- connV11.disconnect();
+ conn.disconnect();
//no messages can be received.
MessageConsumer consumer = session.createConsumer(queue);
@@ -1286,26 +1180,26 @@ public class StompV11Test extends StompV11TestBase {
@Test
public void testAckModeAuto() throws Exception {
- connV11.connect(defUser, defPass);
+ conn.connect(defUser, defPass);
- subscribe(connV11, "sub1", "auto");
+ subscribe(conn, "sub1", Stomp.Headers.Subscribe.AckModeValues.AUTO);
int num = 50;
//send a bunch of messages
for (int i = 0; i < num; i++) {
- this.sendMessage("auto-ack" + i);
+ this.sendJmsMessage("auto-ack" + i);
}
ClientStompFrame frame = null;
for (int i = 0; i < num; i++) {
- frame = connV11.receiveFrame();
+ frame = conn.receiveFrame();
assertNotNull(frame);
}
- unsubscribe(connV11, "sub1");
+ unsubscribe(conn, "sub1");
- connV11.disconnect();
+ conn.disconnect();
//no messages can be received.
MessageConsumer consumer = session.createConsumer(queue);
@@ -1315,32 +1209,32 @@ public class StompV11Test extends StompV11TestBase {
@Test
public void testAckModeClientIndividual() throws Exception {
- connV11.connect(defUser, defPass);
+ conn.connect(defUser, defPass);
- subscribe(connV11, "sub1", "client-individual");
+ subscribe(conn, "sub1", "client-individual");
int num = 50;
//send a bunch of messages
for (int i = 0; i < num; i++) {
- this.sendMessage("client-individual-ack" + i);
+ this.sendJmsMessage("client-individual-ack" + i);
}
ClientStompFrame frame = null;
for (int i = 0; i < num; i++) {
- frame = connV11.receiveFrame();
+ frame = conn.receiveFrame();
assertNotNull(frame);
- System.out.println(i + " == received: " + frame);
+ IntegrationTestLogger.LOGGER.info(i + " == received: " + frame);
//ack on even numbers
if (i % 2 == 0) {
- this.ack(connV11, "sub1", frame);
+ this.ack(conn, "sub1", frame);
}
}
- unsubscribe(connV11, "sub1");
+ unsubscribe(conn, "sub1");
- connV11.disconnect();
+ conn.disconnect();
//no messages can be received.
MessageConsumer consumer = session.createConsumer(queue);
@@ -1349,7 +1243,7 @@ public class StompV11Test extends StompV11TestBase {
for (int i = 0; i < num / 2; i++) {
message = (TextMessage) consumer.receive(1000);
Assert.assertNotNull(message);
- System.out.println("Legal: " + message.getText());
+ IntegrationTestLogger.LOGGER.info("Legal: " + message.getText());
}
message = (TextMessage) consumer.receive(1000);
@@ -1359,64 +1253,55 @@ public class StompV11Test extends StompV11TestBase {
@Test
public void testTwoSubscribers() throws Exception {
- connV11.connect(defUser, defPass, CLIENT_ID);
+ conn.connect(defUser, defPass, CLIENT_ID);
- this.subscribeTopic(connV11, "sub1", "auto", null);
+ this.subscribeTopic(conn, "sub1", Stomp.Headers.Subscribe.AckModeValues.AUTO, null);
StompClientConnection newConn = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
newConn.connect(defUser, defPass, "myclientid2");
- this.subscribeTopic(newConn, "sub2", "auto", null);
-
- ClientStompFrame frame = connV11.createFrame("SEND");
- frame.addHeader("destination", getTopicPrefix() + getTopicName());
-
- frame.setBody("Hello World");
+ this.subscribeTopic(newConn, "sub2", Stomp.Headers.Subscribe.AckModeValues.AUTO, null);
- connV11.sendFrame(frame);
+ send(conn, getTopicPrefix() + getTopicName(), null, "Hello World");
// receive message from socket
- frame = connV11.receiveFrame(1000);
+ ClientStompFrame frame = conn.receiveFrame(1000);
- System.out.println("received frame : " + frame);
+ IntegrationTestLogger.LOGGER.info("received frame : " + frame);
assertEquals("Hello World", frame.getBody());
- assertEquals("sub1", frame.getHeader("subscription"));
+ assertEquals("sub1", frame.getHeader(Stomp.Headers.Message.SUBSCRIPTION));
frame = newConn.receiveFrame(1000);
- System.out.println("received 2 frame : " + frame);
+ IntegrationTestLogger.LOGGER.info("received 2 frame : " + frame);
assertEquals("Hello World", frame.getBody());
- assertEquals("sub2", frame.getHeader("subscription"));
+ assertEquals("sub2", frame.getHeader(Stomp.Headers.Message.SUBSCRIPTION));
// remove suscription
- this.unsubscribe(connV11, "sub1", true);
+ this.unsubscribe(conn, "sub1", true);
this.unsubscribe(newConn, "sub2", true);
- connV11.disconnect();
+ conn.disconnect();
newConn.disconnect();
}
@Test
public void testSendAndReceiveOnDifferentConnections() throws Exception {
- connV11.connect(defUser, defPass);
-
- ClientStompFrame sendFrame = connV11.createFrame("SEND");
- sendFrame.addHeader("destination", getQueuePrefix() + getQueueName());
- sendFrame.setBody("Hello World");
+ conn.connect(defUser, defPass);
- connV11.sendFrame(sendFrame);
+ send(conn, getQueuePrefix() + getQueueName(), null, "Hello World");
StompClientConnection connV11_2 = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
connV11_2.connect(defUser, defPass);
- this.subscribe(connV11_2, "sub1", "auto");
+ this.subscribe(connV11_2, "sub1", Stomp.Headers.Subscribe.AckModeValues.AUTO);
ClientStompFrame frame = connV11_2.receiveFrame(2000);
- assertEquals("MESSAGE", frame.getCommand());
+ assertEquals(Stomp.Responses.MESSAGE, frame.getCommand());
assertEquals("Hello World", frame.getBody());
- connV11.disconnect();
+ conn.disconnect();
connV11_2.disconnect();
}
@@ -1424,79 +1309,81 @@ public class StompV11Test extends StompV11TestBase {
@Test
public void testBeginSameTransactionTwice() throws Exception {
- connV11.connect(defUser, defPass);
+ conn.connect(defUser, defPass);
- beginTransaction(connV11, "tx1");
+ beginTransaction(conn, "tx1");
- beginTransaction(connV11, "tx1");
+ beginTransaction(conn, "tx1");
- ClientStompFrame f = connV11.receiveFrame();
- Assert.assertTrue(f.getCommand().equals("ERROR"));
+ ClientStompFrame f = conn.receiveFrame();
+ Assert.assertTrue(f.getCommand().equals(Stomp.Responses.ERROR));
}
@Test
public void testBodyWithUTF8() throws Exception {
- connV11.connect(defUser, defPass);
+ conn.connect(defUser, defPass);
- this.subscribe(connV11, getName(), "auto");
+ this.subscribe(conn, getName(), Stomp.Headers.Subscribe.AckModeValues.AUTO);
String text = "A" + "\u00ea" + "\u00f1" + "\u00fc" + "C";
- System.out.println(text);
- sendMessage(text);
+ IntegrationTestLogger.LOGGER.info(text);
+ sendJmsMessage(text);
- ClientStompFrame frame = connV11.receiveFrame();
- System.out.println(frame);
- Assert.assertTrue(frame.getCommand().equals("MESSAGE"));
- Assert.assertNotNull(frame.getHeader("destination"));
+ ClientStompFrame frame = conn.receiveFrame();
+ IntegrationTestLogger.LOGGER.info(frame);
+ Assert.assertTrue(frame.getCommand().equals(Stomp.Responses.MESSAGE));
+ Assert.assertNotNull(frame.getHeader(Stomp.Headers.Message.DESTINATION));
Assert.assertTrue(frame.getBody().equals(text));
- connV11.disconnect();
+ conn.disconnect();
}
@Test
public void testClientAckNotPartOfTransaction() throws Exception {
- connV11.connect(defUser, defPass);
+ conn.connect(defUser, defPass);
- this.subscribe(connV11, getName(), "client");
+ this.subscribe(conn, getName(), Stomp.Headers.Subscribe.AckModeValues.CLIENT);
- sendMessage(getName());
+ sendJmsMessage(getName());
- ClientStompFrame frame = connV11.receiveFrame();
+ ClientStompFrame frame = conn.receiveFrame();
- Assert.assertTrue(frame.getCommand().equals("MESSAGE"));
- Assert.assertNotNull(frame.getHeader("destination"));
+ Assert.assertTrue(frame.getCommand().equals(Stomp.Responses.MESSAGE));
+ Assert.assertNotNull(frame.getHeader(Stomp.Headers.Message.DESTINATION));
Assert.assertTrue(frame.getBody().equals(getName()));
- Assert.assertNotNull(frame.getHeader("message-id"));
+ Assert.assertNotNull(frame.getHeader(Stomp.Headers.Message.MESSAGE_ID));
- String messageID = frame.getHeader("message-id");
+ String messageID = frame.getHeader(Stomp.Headers.Message.MESSAGE_ID);
- beginTransaction(connV11, "tx1");
+ beginTransaction(conn, "tx1");
- this.ack(connV11, getName(), messageID, "tx1");
+ this.ack(conn, getName(), messageID, "tx1");
- abortTransaction(connV11, "tx1");
+ abortTransaction(conn, "tx1");
- frame = connV11.receiveFrame(500);
+ frame = conn.receiveFrame(500);
assertNull(frame);
- this.unsubscribe(connV11, getName());
+ this.unsubscribe(conn, getName());
- connV11.disconnect();
+ conn.disconnect();
}
@Test
public void testDisconnectAndError() throws Exception {
- connV11.connect(defUser, defPass);
+ conn.connect(defUser, defPass);
+
+ this.subscribe(conn, getName(), Stomp.Headers.Subscribe.AckModeValues.CLIENT);
- this.subscribe(connV11, getName(), "client");
+ String uuid = UUID.randomUUID().toString();
- ClientStompFrame frame = connV11.createFrame("DISCONNECT");
- frame.addHeader("receipt", "1");
+ ClientStompFrame frame = conn.createFrame(Stomp.Commands.DISCONNECT)
+ .addHeader(Stomp.Headers.RECEIPT_REQUESTED, uuid);
- ClientStompFrame result = connV11.sendFrame(frame);
+ ClientStompFrame result = conn.sendFrame(frame);
- if (result == null || (!"RECEIPT".equals(result.getCommand())) || (!"1".equals(result.getHeader("receipt-id")))) {
+ if (result == null || (!Stomp.Responses.RECEIPT.equals(result.getCommand())) || (!uuid.equals(result.getHeader(Stomp.Headers.Response.RECEIPT_ID)))) {
fail("Disconnect failed! " + result);
}
@@ -1505,12 +1392,9 @@ public class StompV11Test extends StompV11TestBase {
Thread thr = new Thread() {
@Override
public void run() {
- ClientStompFrame sendFrame = connV11.createFrame("SEND");
- sendFrame.addHeader("destination", getQueuePrefix() + getQueueName());
- sendFrame.setBody("Hello World");
while (latch.getCount() != 0) {
try {
- connV11.sendFrame(sendFrame);
+ send(conn, getQueuePrefix() + getQueueName(), null, "Hello World");
Thread.sleep(500);
} catch (InterruptedException e) {
//retry
@@ -1523,7 +1407,7 @@ public class StompV11Test extends StompV11TestBase {
latch.countDown();
break;
} finally {
- connV11.destroy();
+ conn.destroy();
}
}
}
@@ -1543,66 +1427,68 @@ public class StompV11Test extends StompV11TestBase {
@Test
public void testDurableSubscriber() throws Exception {
- connV11.connect(defUser, defPass);
+ conn.connect(defUser, defPass);
- this.subscribe(connV11, "sub1", "client", getName());
+ this.subscribe(conn, "sub1", Stomp.Headers.Subscribe.AckModeValues.CLIENT, getName());
- this.subscribe(connV11, "sub1", "client", getName());
+ this.subscribe(conn, "sub1", Stomp.Headers.Subscribe.AckModeValues.CLIENT, getName());
- ClientStompFrame frame = connV11.receiveFrame();
- Assert.assertTrue(frame.getCommand().equals("ERROR"));
+ ClientStompFrame frame = conn.receiveFrame();
+ Assert.assertTrue(frame.getCommand().equals(Stomp.Responses.ERROR));
- connV11.disconnect();
+ conn.disconnect();
}
@Test
public void testDurableSubscriberWithReconnection() throws Exception {
- connV11.connect(defUser, defPass, CLIENT_ID);
+ conn.connect(defUser, defPass, CLIENT_ID);
- this.subscribeTopic(connV11, "sub1", "auto", getName());
+ this.subscribeTopic(conn, "sub1", Stomp.Headers.Subscribe.AckModeValues.AUTO, getName());
- ClientStompFrame frame = connV11.createFrame("DISCONNECT");
- frame.addHeader("receipt", "1");
+ String uuid = UUID.randomUUID().toString();
- ClientStompFrame result = connV11.sendFrame(frame);
+ ClientStompFrame frame = conn.createFrame(Stomp.Commands.DISCONNECT)
+ .addHeader(Stomp.Headers.RECEIPT_REQUESTED, uuid);
- if (result == null || (!"RECEIPT".equals(result.getCommand())) || (!"1".equals(result.getHeader("receipt-id")))) {
+ ClientStompFrame result = conn.sendFrame(frame);
+
+ if (result == null || (!Stomp.Responses.RECEIPT.equals(result.getCommand())) || (!uuid.equals(result.getHeader(Stomp.Headers.Response.RECEIPT_ID)))) {
fail("Disconnect failed! " + result);
}
// send the message when the durable subscriber is disconnected
- sendMessage(getName(), topic);
+ sendJmsMessage(getName(), topic);
- connV11.destroy();
- connV11 = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
- connV11.connect(defUser, defPass, CLIENT_ID);
+ conn.destroy();
+ conn = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
+ conn.connect(defUser, defPass, CLIENT_ID);
- this.subscribeTopic(connV11, "sub1", "auto", getName());
+ this.subscribeTopic(conn, "sub1", Stomp.Headers.Subscribe.AckModeValues.AUTO, getName());
// we must have received the message
- frame = connV11.receiveFrame();
+ frame = conn.receiveFrame();
- Assert.assertTrue(frame.getCommand().equals("MESSAGE"));
- Assert.assertNotNull(frame.getHeader("destination"));
+ Assert.assertTrue(frame.getCommand().equals(Stomp.Responses.MESSAGE));
+ Assert.assertNotNull(frame.getHeader(Stomp.Headers.Message.DESTINATION));
Assert.assertEquals(getName(), frame.getBody());
- this.unsubscribe(connV11, "sub1");
+ this.unsubscribe(conn, "sub1");
- connV11.disconnect();
+ conn.disconnect();
}
@Test
public void testDurableUnSubscribe() throws Exception {
- connV11.connect(defUser, defPass, CLIENT_ID);
+ conn.connect(defUser, defPass, CLIENT_ID);
- this.subscribeTopic(connV11, null, "auto", getName());
+ this.subscribeTopic(conn, null, Stomp.Headers.Subscribe.AckModeValues.AUTO, getName());
- connV11.disconnect();
- connV11.destroy();
- connV11 = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
- connV11.connect(defUser, defPass, CLIENT_ID);
+ conn.disconnect();
+ conn.destroy();
+ conn = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
+ conn.connect(defUser, defPass, CLIENT_ID);
- this.unsubscribe(connV11, getName(), false, true);
+ this.unsubscribe(conn, getName(), null, false, true);
long start = System.currentTimeMillis();
SimpleString queueName = SimpleString.toSimpleString(CLIENT_ID + "." + getName());
@@ -1612,21 +1498,21 @@ public class StompV11Test extends StompV11TestBase {
assertNull(server.getActiveMQServer().locateQueue(queueName));
- connV11.disconnect();
+ conn.disconnect();
}
@Test
public void testJMSXGroupIdCanBeSet() throws Exception {
MessageConsumer consumer = session.createConsumer(queue);
- connV11.connect(defUser, defPass);
+ conn.connect(defUser, defPass);
- ClientStompFrame frame = connV11.createFrame("SEND");
- frame.addHeader("destination", getQueuePrefix() + getQueueName());
- frame.addHeader("JMSXGroupID", "TEST");
- frame.setBody("Hello World");
+ ClientStompFrame frame = conn.createFrame(Stomp.Commands.SEND)
+ .addHeader(Stomp.Headers.Send.DESTINATION, getQueuePrefix() + getQueueName())
+ .addHeader("JMSXGroupID", "TEST")
+ .setBody("Hello World");
- connV11.sendFrame(frame);
+ conn.sendFrame(frame);
TextMessage message = (TextMessage) consumer.receive(1000);
Assert.assertNotNull(message);
@@ -1640,64 +1526,64 @@ public class StompV11Test extends StompV11TestBase {
int ctr = 10;
String[] data = new String[ctr];
- connV11.connect(defUser, defPass);
+ conn.connect(defUser, defPass);
- this.subscribe(connV11, "sub1", "auto");
+ this.subscribe(conn, "sub1", Stomp.Headers.Subscribe.AckModeValues.AUTO);
for (int i = 0; i < ctr; ++i) {
data[i] = getName() + i;
- sendMessage(data[i]);
+ sendJmsMessage(data[i]);
}
ClientStompFrame frame = null;
for (int i = 0; i < ctr; ++i) {
- frame = connV11.receiveFrame();
+ frame = conn.receiveFrame();
Assert.assertTrue("Message not in order", frame.getBody().equals(data[i]));
}
for (int i = 0; i < ctr; ++i) {
data[i] = getName() + ":second:" + i;
- sendMessage(data[i]);
+ sendJmsMessage(data[i]);
}
for (int i = 0; i < ctr; ++i) {
- frame = connV11.receiveFrame();
+ frame = conn.receiveFrame();
Assert.assertTrue("Message not in order", frame.getBody().equals(data[i]));
}
- connV11.disconnect();
+ conn.disconnect();
}
@Test
public void testSubscribeWithAutoAckAndSelector() throws Exception {
- connV11.connect(defUser, defPass);
+ conn.connect(defUser, defPass);
- this.subscribe(connV11, "sub1", "auto", null, "foo = 'zzz'");
+ this.subscribe(conn, "sub1", Stomp.Headers.Subscribe.AckModeValues.AUTO, null, "foo = 'zzz'");
- sendMessage("Ignored message", "foo", "1234");
- sendMessage("Real message", "foo", "zzz");
+ sendJmsMessage("Ignored message", "foo", "1234");
+ sendJmsMessage("Real message", "foo", "zzz");
- ClientStompFrame frame = connV11.receiveFrame();
+ ClientStompFrame frame = conn.receiveFrame();
Assert.assertTrue("Should have received the real message but got: " + frame, frame.getBody().equals("Real message"));
- connV11.disconnect();
+ conn.disconnect();
}
@Test
public void testRedeliveryWithClientAck() throws Exception {
- connV11.connect(defUser, defPass);
+ conn.connect(defUser, defPass);
- this.subscribe(connV11, "subId", "client");
+ this.subscribe(conn, "subscriptionId", Stomp.Headers.Subscribe.AckModeValues.CLIENT);
- sendMessage(getName());
+ sendJmsMessage(getName());
- ClientStompFrame frame = connV11.receiveFrame();
+ ClientStompFrame frame = conn.receiveFrame();
- assertTrue(frame.getCommand().equals("MESSAGE"));
+ assertTrue(frame.getCommand().equals(Stomp.Responses.MESSAGE));
- connV11.disconnect();
+ conn.disconnect();
// message should be received since message was not acknowledged
MessageConsumer consumer = session.createConsumer(queue);
@@ -1710,7 +1596,7 @@ public class StompV11Test extends StompV11TestBase {
public void testSendManyMessages() throws Exception {
MessageConsumer consumer = session.createConsumer(queue);
- connV11.connect(defUser, defPass);
+ conn.connect(defUser, defPass);
int count = 1000;
final CountDownLatch latch = new CountDownLatch(count);
@@ -1721,30 +1607,22 @@ public class StompV11Test extends StompV11TestBase {
}
});
- ClientStompFrame frame = connV11.createFrame("SEND");
- frame.addHeader("destination", getQueuePrefix() + getQueueName());
- frame.setBody("Hello World");
-
for (int i = 1; i <= count; i++) {
- connV11.sendFrame(frame);
+ send(conn, getQueuePrefix() + getQueueName(), null, "Hello World");
}
assertTrue(latch.await(60, TimeUnit.SECONDS));
- connV11.disconnect();
+ conn.disconnect();
}
@Test
public void testSendMessage() throws Exception {
MessageConsumer consumer = session.createConsumer(queue);
- connV11.connect(defUser, defPass);
+ conn.connect(defUser, defPass);
- ClientStompFrame frame = connV11.createFrame("SEND");
- frame.addHeader("destination", getQueuePrefix() + getQueueName());
- frame.setBody("Hello World");
-
- connV11.sendFrame(frame);
+ send(conn, getQueuePrefix() + getQueueName(), null, "Hello World");
TextMessage message = (TextMessage) consumer.receive(1000);
Assert.assertNotNull(message);
@@ -1763,18 +1641,16 @@ public class StompV11Test extends StompV11TestBase {
public void testSendMessageWithContentLength() throws Exception {
MessageConsumer consumer = session.createConsumer(queue);
- connV11.connect(defUser, defPass);
+ conn.connect(defUser, defPass);
byte[] data = new byte[]{1, 0, 0, 4};
- ClientStompFrame frame = connV11.createFrame("SEND");
-
- frame.addHeader("destination", getQueuePrefix() + getQueueName());
- frame.setBody(new String(data, StandardCharsets.UTF_8));
-
- frame.addHeader("content-length", String.valueOf(data.length));
+ ClientStompFrame frame = conn.createFrame(Stomp.Commands.SEND)
+ .addHeader(Stomp.Headers.Send.DESTINATION, getQueuePrefix() + getQueueName())
+ .setBody(new String(data, StandardCharsets.UTF_8))
+ .addHeader(Stomp.Headers.CONTENT_LENGTH, String.valueOf(data.length));
- connV11.sendFrame(frame);
+ conn.sendFrame(frame);
BytesMessage message = (BytesMessage) consumer.receive(10000);
Assert.assertNotNull(message);
@@ -1790,16 +1666,15 @@ public class StompV11Test extends StompV11TestBase {
public void testSendMessageWithCustomHeadersAndSelector() throws Exception {
MessageConsumer consumer = session.createConsumer(queue, "foo = 'abc'");
- connV11.connect(defUser, defPass);
+ conn.connect(defUser, defPass);
- ClientStompFrame frame = connV11.createFrame("SEND");
- frame.addHeader("foo", "abc");
- frame.addHeader("bar", "123");
-
- frame.addHeader("destination", getQueuePrefix() + getQueueName());
- frame.setBody("Hello World");
+ ClientStompFrame frame = conn.createFrame(Stomp.Commands.SEND)
+ .addHeader("foo", "abc")
+ .addHeader("bar", "123")
+ .addHeader(Stomp.Headers.Send.DESTINATION, getQueuePrefix() + getQueueName())
+ .setBody("Hello World");
- connV11.sendFrame(frame);
+ conn.sendFrame(frame);
TextMessage message = (TextMessage) consumer.receive(1000);
Assert.assertNotNull(message);
@@ -1812,14 +1687,13 @@ public class StompV11Test extends StompV11TestBase {
public void testSendMessageWithLeadingNewLine() throws Exception {
MessageConsumer consumer = session.createConsumer(queue);
- connV11.connect(defUser, defPass);
+ conn.connect(defUser, defPass);
- ClientStompFrame frame = connV11.createFrame("SEND");
+ ClientStompFrame frame = conn.createFrame(Stomp.Commands.SEND)
+ .addHeader(Stomp.Headers.Send.DESTINATION, getQueuePrefix() + getQueueName())
+ .setBody("Hello World");
- frame.addHeader("destination", getQueuePrefix() + getQueueName());
- frame.setBody("Hello World");
-
- connV11.sendWickedFrame(frame);
+ conn.sendWickedFrame(frame);
TextMessage message = (TextMessage) consumer.receive(1000);
Assert.assertNotNull(message);
@@ -1833,24 +1707,17 @@ public class StompV11Test extends StompV11TestBase {
assertNull(consumer.receive(1000));
- connV11.disconnect();
+ conn.disconnect();
}
@Test
public void testSendMessageWithReceipt() throws Exception {
MessageConsumer consumer = session.createConsumer(queue);
- connV11.connect(defUser, defPass);
+ conn.connect(defUser, defPass);
- ClientStompFrame frame = connV11.createFrame("SEND");
- frame.addHeader("destination", getQueuePrefix() + getQueueName());
- frame.addHeader("receipt", "1234");
- frame.setBody("Hello World");
-
- frame = connV11.sendFrame(frame);
- assertTrue(frame.getCommand().equals("RECEIPT"));
- assertEquals("1234", frame.getHeader("receipt-id"));
+ send(conn, getQueuePrefix() + getQueueName(), null, "Hello World", true);
TextMessage message = (TextMessage) consumer.receive(1000);
Assert.assertNotNull(message);
@@ -1862,28 +1729,27 @@ public class StompV11Test extends StompV11TestBase {
long tmsg = message.getJMSTimestamp();
Assert.assertTrue(Math.abs(tnow - tmsg) < 1000);
- connV11.disconnect();
+ conn.disconnect();
}
@Test
public void testSendMessageWithStandardHeaders() throws Exception {
MessageConsumer consumer = session.createConsumer(queue);
- connV11.connect(defUser, defPass);
+ conn.connect(defUser, defPass);
- ClientStompFrame frame = connV11.createFrame("SEND");
- frame.addHeader("destination", getQueuePrefix() + getQueueName());
- frame.addHeader("correlation-id", "c123");
- frame.addHeader("persistent", "true");
- frame.addHeader("priority", "3");
- frame.addHeader("type", "t345");
- frame.addHeader("JMSXGroupID", "abc");
- frame.addHeader("foo", "abc");
- frame.addHeader("bar", "123");
+ ClientStompFrame frame = conn.createFrame(Stomp.Commands.SEND)
+ .addHeader(Stomp.Headers.Send.DESTINATION, getQueuePrefix() + getQueueName())
+ .addHeader(Stomp.Headers.Message.CORRELATION_ID, "c123")
+ .addHeader(Stomp.Headers.Message.PERSISTENT, "true")
+ .addHeader(Stomp.Headers.Message.PRIORITY, "3")
+ .addHeader(Stomp.Headers.Message.TYPE, "t345")
+ .addHeader("JMSXGroupID", "abc")
+ .addHeader("foo", "abc")
+ .addHeader("bar", "123")
+ .setBody("Hello World");
- frame.setBody("Hello World");
-
- frame = connV11.sendFrame(frame);
+ frame = conn.sendFrame(frame);
TextMessage message = (TextMessage) consumer
<TRUNCATED>