You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by jb...@apache.org on 2016/11/11 19:23:34 UTC
[04/11] activemq-artemis git commit: Stomp refactor + track
autocreation for addresses
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/84df373a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/v11/StompV11Test.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/v11/StompV11Test.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/v11/StompV11Test.java
index da69d96..e8a6f25 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/v11/StompV11Test.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/v11/StompV11Test.java
@@ -32,8 +32,10 @@ import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.protocol.stomp.Stomp;
+import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
+import org.apache.activemq.artemis.tests.integration.stomp.StompTestBase;
import org.apache.activemq.artemis.tests.integration.stomp.util.ClientStompFrame;
import org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConnection;
import org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConnectionFactory;
@@ -46,28 +48,28 @@ import org.junit.Test;
/*
*
*/
-public class StompV11Test extends StompV11TestBase {
+public class StompV11Test extends StompTestBase {
private static final transient IntegrationTestLogger log = IntegrationTestLogger.LOGGER;
public static final String CLIENT_ID = "myclientid";
- private StompClientConnection connV11;
+ private StompClientConnection conn;
@Override
@Before
public void setUp() throws Exception {
super.setUp();
- connV11 = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
+ conn = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
}
@Override
@After
public void tearDown() throws Exception {
try {
- boolean connected = connV11 != null && connV11.isConnected();
- log.debug("Connection 11 : " + connected);
+ boolean connected = conn != null && conn.isConnected();
+ log.debug("Connection 1.1 : " + connected);
if (connected) {
- connV11.disconnect();
+ conn.disconnect();
}
} finally {
super.tearDown();
@@ -115,276 +117,234 @@ public class StompV11Test extends StompV11TestBase {
conn = (StompClientConnectionV11) StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
ClientStompFrame frame = conn.connect("invaliduser", defPass);
assertFalse(conn.isConnected());
- assertTrue("ERROR".equals(frame.getCommand()));
+ assertTrue(Stomp.Responses.ERROR.equals(frame.getCommand()));
assertTrue(frame.getBody().contains("Security Error occurred"));
}
@Test
public void testNegotiation() throws Exception {
// case 1 accept-version absent. It is a 1.0 connect
- ClientStompFrame frame = connV11.createFrame("CONNECT");
- frame.addHeader("host", "127.0.0.1");
- frame.addHeader("login", this.defUser);
- frame.addHeader("passcode", this.defPass);
+ ClientStompFrame frame = conn.createFrame(Stomp.Commands.CONNECT)
+ .addHeader(Stomp.Headers.Connect.HOST, "127.0.0.1")
+ .addHeader(Stomp.Headers.Connect.LOGIN, this.defUser)
+ .addHeader(Stomp.Headers.Connect.PASSCODE, this.defPass);
- ClientStompFrame reply = connV11.sendFrame(frame);
+ ClientStompFrame reply = conn.sendFrame(frame);
- assertEquals("CONNECTED", reply.getCommand());
+ assertEquals(Stomp.Responses.CONNECTED, reply.getCommand());
//reply headers: version, session, server
assertEquals(null, reply.getHeader("version"));
- connV11.disconnect();
+ conn.disconnect();
// case 2 accept-version=1.0, result: 1.0
- connV11 = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
- frame = connV11.createFrame("CONNECT");
- frame.addHeader("accept-version", "1.0");
- frame.addHeader("host", "127.0.0.1");
- frame.addHeader("login", this.defUser);
- frame.addHeader("passcode", this.defPass);
+ conn = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
+ frame = conn.createFrame(Stomp.Commands.CONNECT)
+ .addHeader(Stomp.Headers.Connect.ACCEPT_VERSION, "1.0")
+ .addHeader(Stomp.Headers.Connect.HOST, "127.0.0.1")
+ .addHeader(Stomp.Headers.Connect.LOGIN, this.defUser)
+ .addHeader(Stomp.Headers.Connect.PASSCODE, this.defPass);
- reply = connV11.sendFrame(frame);
+ reply = conn.sendFrame(frame);
- assertEquals("CONNECTED", reply.getCommand());
+ assertEquals(Stomp.Responses.CONNECTED, reply.getCommand());
//reply headers: version, session, server
assertEquals("1.0", reply.getHeader("version"));
- connV11.disconnect();
+ conn.disconnect();
// case 3 accept-version=1.1, result: 1.1
- connV11 = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
- frame = connV11.createFrame("CONNECT");
- frame.addHeader("accept-version", "1.1");
- frame.addHeader("host", "127.0.0.1");
- frame.addHeader("login", this.defUser);
- frame.addHeader("passcode", this.defPass);
+ conn = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
+ frame = conn.createFrame(Stomp.Commands.CONNECT)
+ .addHeader(Stomp.Headers.Connect.ACCEPT_VERSION, "1.1")
+ .addHeader(Stomp.Headers.Connect.HOST, "127.0.0.1")
+ .addHeader(Stomp.Headers.Connect.LOGIN, this.defUser)
+ .addHeader(Stomp.Headers.Connect.PASSCODE, this.defPass);
- reply = connV11.sendFrame(frame);
+ reply = conn.sendFrame(frame);
- assertEquals("CONNECTED", reply.getCommand());
+ assertEquals(Stomp.Responses.CONNECTED, reply.getCommand());
//reply headers: version, session, server
assertEquals("1.1", reply.getHeader("version"));
- connV11.disconnect();
+ conn.disconnect();
// case 4 accept-version=1.0,1.1,1.2, result 1.1
- connV11 = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
- frame = connV11.createFrame("CONNECT");
- frame.addHeader("accept-version", "1.0,1.1,1.3");
- frame.addHeader("host", "127.0.0.1");
- frame.addHeader("login", this.defUser);
- frame.addHeader("passcode", this.defPass);
+ conn = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
+ frame = conn.createFrame(Stomp.Commands.CONNECT)
+ .addHeader(Stomp.Headers.Connect.ACCEPT_VERSION, "1.0,1.1,1.3")
+ .addHeader(Stomp.Headers.Connect.HOST, "127.0.0.1")
+ .addHeader(Stomp.Headers.Connect.LOGIN, this.defUser)
+ .addHeader(Stomp.Headers.Connect.PASSCODE, this.defPass);
- reply = connV11.sendFrame(frame);
+ reply = conn.sendFrame(frame);
- assertEquals("CONNECTED", reply.getCommand());
+ assertEquals(Stomp.Responses.CONNECTED, reply.getCommand());
//reply headers: version, session, server
assertEquals("1.1", reply.getHeader("version"));
- connV11.disconnect();
+ conn.disconnect();
// case 5 accept-version=1.2, result error
- connV11 = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
- frame = connV11.createFrame("CONNECT");
- frame.addHeader("accept-version", "1.3");
- frame.addHeader("host", "127.0.0.1");
- frame.addHeader("login", this.defUser);
- frame.addHeader("passcode", this.defPass);
+ conn = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
+ frame = conn.createFrame(Stomp.Commands.CONNECT)
+ .addHeader(Stomp.Headers.Connect.ACCEPT_VERSION, "1.3")
+ .addHeader(Stomp.Headers.Connect.HOST, "127.0.0.1")
+ .addHeader(Stomp.Headers.Connect.LOGIN, this.defUser)
+ .addHeader(Stomp.Headers.Connect.PASSCODE, this.defPass);
- reply = connV11.sendFrame(frame);
+ reply = conn.sendFrame(frame);
- assertEquals("ERROR", reply.getCommand());
+ assertEquals(Stomp.Responses.ERROR, reply.getCommand());
- System.out.println("Got error frame " + reply);
+ IntegrationTestLogger.LOGGER.info("Got error frame " + reply);
}
@Test
public void testSendAndReceive() throws Exception {
- connV11.connect(defUser, defPass);
- ClientStompFrame frame = connV11.createFrame("SEND");
- frame.addHeader("destination", getQueuePrefix() + getQueueName());
- frame.addHeader("content-type", "text/plain");
- frame.setBody("Hello World 1!");
+ conn.connect(defUser, defPass);
- ClientStompFrame response = connV11.sendFrame(frame);
+ ClientStompFrame response = send(conn, getQueuePrefix() + getQueueName(), "text/plain", "Hello World 1!");
assertNull(response);
- frame.addHeader("receipt", "1234");
- frame.setBody("Hello World 2!");
-
- response = connV11.sendFrame(frame);
-
- assertNotNull(response);
+ String uuid = UUID.randomUUID().toString();
- assertEquals("RECEIPT", response.getCommand());
-
- assertEquals("1234", response.getHeader("receipt-id"));
+ response = send(conn, getQueuePrefix() + getQueueName(), "text/plain", "Hello World 2!", true);
//subscribe
StompClientConnection newConn = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
newConn.connect(defUser, defPass);
- ClientStompFrame subFrame = newConn.createFrame("SUBSCRIBE");
- subFrame.addHeader("id", "a-sub");
- subFrame.addHeader("destination", getQueuePrefix() + getQueueName());
- subFrame.addHeader("ack", "auto");
-
- newConn.sendFrame(subFrame);
+ subscribe(newConn, "a-sub");
- frame = newConn.receiveFrame();
+ ClientStompFrame frame = newConn.receiveFrame();
- System.out.println("received " + frame);
+ IntegrationTestLogger.LOGGER.info("received " + frame);
- assertEquals("MESSAGE", frame.getCommand());
+ assertEquals(Stomp.Responses.MESSAGE, frame.getCommand());
- assertEquals("a-sub", frame.getHeader("subscription"));
+ assertEquals("a-sub", frame.getHeader(Stomp.Headers.Ack.SUBSCRIPTION));
- assertNotNull(frame.getHeader("message-id"));
+ assertNotNull(frame.getHeader(Stomp.Headers.Message.MESSAGE_ID));
- assertEquals(getQueuePrefix() + getQueueName(), frame.getHeader("destination"));
+ assertEquals(getQueuePrefix() + getQueueName(), frame.getHeader(Stomp.Headers.Message.DESTINATION));
assertEquals("Hello World 1!", frame.getBody());
frame = newConn.receiveFrame();
- System.out.println("received " + frame);
+ IntegrationTestLogger.LOGGER.info("received " + frame);
- //unsub
- ClientStompFrame unsubFrame = newConn.createFrame("UNSUBSCRIBE");
- unsubFrame.addHeader("id", "a-sub");
- newConn.sendFrame(unsubFrame);
+ unsubscribe(newConn, "a-sub");
newConn.disconnect();
}
@Test
public void testHeaderContentType() throws Exception {
- connV11.connect(defUser, defPass);
- ClientStompFrame frame = connV11.createFrame("SEND");
- frame.addHeader("destination", getQueuePrefix() + getQueueName());
- frame.addHeader("content-type", "application/xml");
- frame.setBody("Hello World 1!");
-
- connV11.sendFrame(frame);
+ conn.connect(defUser, defPass);
+ send(conn, getQueuePrefix() + getQueueName(), "application/xml", "Hello World 1!");
//subscribe
StompClientConnection newConn = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
newConn.connect(defUser, defPass);
- ClientStompFrame subFrame = newConn.createFrame("SUBSCRIBE");
- subFrame.addHeader("id", "a-sub");
- subFrame.addHeader("destination", getQueuePrefix() + getQueueName());
- subFrame.addHeader("ack", "auto");
+ subscribe(newConn, "a-sub");
- newConn.sendFrame(subFrame);
+ ClientStompFrame frame = newConn.receiveFrame();
- frame = newConn.receiveFrame();
-
- System.out.println("received " + frame);
+ IntegrationTestLogger.LOGGER.info("received " + frame);
- assertEquals("MESSAGE", frame.getCommand());
+ assertEquals(Stomp.Responses.MESSAGE, frame.getCommand());
- assertEquals("application/xml", frame.getHeader("content-type"));
+ assertEquals("application/xml", frame.getHeader(Stomp.Headers.CONTENT_TYPE));
//unsub
- ClientStompFrame unsubFrame = newConn.createFrame("UNSUBSCRIBE");
- unsubFrame.addHeader("id", "a-sub");
+ unsubscribe(newConn, "a-sub");
newConn.disconnect();
}
@Test
public void testHeaderContentLength() throws Exception {
- connV11.connect(defUser, defPass);
- ClientStompFrame frame = connV11.createFrame("SEND");
+ conn.connect(defUser, defPass);
String body = "Hello World 1!";
String cLen = String.valueOf(body.getBytes(StandardCharsets.UTF_8).length);
- frame.addHeader("destination", getQueuePrefix() + getQueueName());
- frame.addHeader("content-type", "application/xml");
- frame.addHeader("content-length", cLen);
- frame.setBody(body + "extra");
+ ClientStompFrame frame = conn.createFrame(Stomp.Commands.SEND)
+ .addHeader(Stomp.Headers.Send.DESTINATION, getQueuePrefix() + getQueueName())
+ .addHeader(Stomp.Headers.CONTENT_TYPE, "application/xml")
+ .addHeader(Stomp.Headers.CONTENT_LENGTH, cLen)
+ .setBody(body + "extra");
- connV11.sendFrame(frame);
+ conn.sendFrame(frame);
//subscribe
StompClientConnection newConn = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
newConn.connect(defUser, defPass);
- ClientStompFrame subFrame = newConn.createFrame("SUBSCRIBE");
- subFrame.addHeader("id", "a-sub");
- subFrame.addHeader("destination", getQueuePrefix() + getQueueName());
- subFrame.addHeader("ack", "auto");
-
- newConn.sendFrame(subFrame);
+ subscribe(newConn, "a-sub");
frame = newConn.receiveFrame();
- System.out.println("received " + frame);
+ IntegrationTestLogger.LOGGER.info("received " + frame);
- assertEquals("MESSAGE", frame.getCommand());
+ assertEquals(Stomp.Responses.MESSAGE, frame.getCommand());
- assertEquals(cLen, frame.getHeader("content-length"));
+ assertEquals(cLen, frame.getHeader(Stomp.Headers.CONTENT_LENGTH));
//unsub
- ClientStompFrame unsubFrame = newConn.createFrame("UNSUBSCRIBE");
- unsubFrame.addHeader("id", "a-sub");
+ unsubscribe(newConn, "a-sub");
newConn.disconnect();
}
@Test
public void testHeaderEncoding() throws Exception {
- connV11.connect(defUser, defPass);
- ClientStompFrame frame = connV11.createFrame("SEND");
+ conn.connect(defUser, defPass);
String body = "Hello World 1!";
String cLen = String.valueOf(body.getBytes(StandardCharsets.UTF_8).length);
-
- frame.addHeader("destination", getQueuePrefix() + getQueueName());
- frame.addHeader("content-type", "application/xml");
- frame.addHeader("content-length", cLen);
String hKey = "special-header\\\\\\n\\c";
String hVal = "\\c\\\\\\ngood";
- frame.addHeader(hKey, hVal);
- System.out.println("key: |" + hKey + "| val: |" + hVal + "|");
+ ClientStompFrame frame = conn.createFrame(Stomp.Commands.SEND)
+ .addHeader(Stomp.Headers.Send.DESTINATION, getQueuePrefix() + getQueueName())
+ .addHeader(Stomp.Headers.CONTENT_TYPE, "application/xml")
+ .addHeader(Stomp.Headers.CONTENT_LENGTH, cLen)
+ .addHeader(hKey, hVal);
+
+ IntegrationTestLogger.LOGGER.info("key: |" + hKey + "| val: |" + hVal + "|");
frame.setBody(body);
- connV11.sendFrame(frame);
+ conn.sendFrame(frame);
//subscribe
StompClientConnection newConn = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
newConn.connect(defUser, defPass);
- ClientStompFrame subFrame = newConn.createFrame("SUBSCRIBE");
- subFrame.addHeader("id", "a-sub");
- subFrame.addHeader("destination", getQueuePrefix() + getQueueName());
- subFrame.addHeader("ack", "auto");
-
- newConn.sendFrame(subFrame);
+ subscribe(newConn, "a-sub");
frame = newConn.receiveFrame();
- System.out.println("received " + frame);
+ IntegrationTestLogger.LOGGER.info("received " + frame);
- assertEquals("MESSAGE", frame.getCommand());
+ assertEquals(Stomp.Responses.MESSAGE, frame.getCommand());
String value = frame.getHeader("special-header" + "\\" + "\n" + ":");
assertEquals(":" + "\\" + "\n" + "good", value);
//unsub
- ClientStompFrame unsubFrame = newConn.createFrame("UNSUBSCRIBE");
- unsubFrame.addHeader("id", "a-sub");
+ unsubscribe(newConn, "a-sub");
newConn.disconnect();
}
@@ -392,106 +352,96 @@ public class StompV11Test extends StompV11TestBase {
@Test
public void testHeartBeat() throws Exception {
//no heart beat at all if heat-beat absent
- ClientStompFrame frame = connV11.createFrame("CONNECT");
- frame.addHeader("host", "127.0.0.1");
- frame.addHeader("login", this.defUser);
- frame.addHeader("passcode", this.defPass);
+ ClientStompFrame frame = conn.createFrame(Stomp.Commands.CONNECT)
+ .addHeader(Stomp.Headers.Connect.HOST, "127.0.0.1")
+ .addHeader(Stomp.Headers.Connect.LOGIN, this.defUser)
+ .addHeader(Stomp.Headers.Connect.PASSCODE, this.defPass);
- ClientStompFrame reply = connV11.sendFrame(frame);
+ ClientStompFrame reply = conn.sendFrame(frame);
- assertEquals("CONNECTED", reply.getCommand());
+ assertEquals(Stomp.Responses.CONNECTED, reply.getCommand());
Thread.sleep(5000);
- assertEquals(0, connV11.getFrameQueueSize());
+ assertEquals(0, conn.getFrameQueueSize());
- connV11.disconnect();
+ conn.disconnect();
//default heart beat for (0,0) which is default connection TTL (60000) / default heartBeatToTtlModifier (2.0) = 30000
- connV11 = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
- frame = connV11.createFrame("CONNECT");
- frame.addHeader("host", "127.0.0.1");
- frame.addHeader("login", this.defUser);
- frame.addHeader("passcode", this.defPass);
- frame.addHeader("heart-beat", "0,0");
- frame.addHeader("accept-version", "1.0,1.1");
+ conn = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
+ frame = conn.createFrame(Stomp.Commands.CONNECT)
+ .addHeader(Stomp.Headers.Connect.HOST, "127.0.0.1")
+ .addHeader(Stomp.Headers.Connect.LOGIN, this.defUser)
+ .addHeader(Stomp.Headers.Connect.PASSCODE, this.defPass)
+ .addHeader(Stomp.Headers.Connect.HEART_BEAT, "0,0")
+ .addHeader(Stomp.Headers.Connect.ACCEPT_VERSION, "1.0,1.1");
- reply = connV11.sendFrame(frame);
+ reply = conn.sendFrame(frame);
- assertEquals("CONNECTED", reply.getCommand());
+ assertEquals(Stomp.Responses.CONNECTED, reply.getCommand());
- assertEquals("0,30000", reply.getHeader("heart-beat"));
+ assertEquals("0,30000", reply.getHeader(Stomp.Headers.Connect.HEART_BEAT));
Thread.sleep(5000);
- assertEquals(0, connV11.getFrameQueueSize());
+ assertEquals(0, conn.getFrameQueueSize());
- connV11.disconnect();
+ conn.disconnect();
//heart-beat (1,0), should receive a min client ping accepted by server
- connV11 = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
- frame = connV11.createFrame("CONNECT");
- frame.addHeader("host", "127.0.0.1");
- frame.addHeader("login", this.defUser);
- frame.addHeader("passcode", this.defPass);
- frame.addHeader("heart-beat", "1,0");
- frame.addHeader("accept-version", "1.0,1.1");
+ conn = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
+ frame = conn.createFrame(Stomp.Commands.CONNECT)
+ .addHeader(Stomp.Headers.Connect.HOST, "127.0.0.1")
+ .addHeader(Stomp.Headers.Connect.LOGIN, this.defUser)
+ .addHeader(Stomp.Headers.Connect.PASSCODE, this.defPass)
+ .addHeader(Stomp.Headers.Connect.HEART_BEAT, "1,0")
+ .addHeader(Stomp.Headers.Connect.ACCEPT_VERSION, "1.0,1.1");
- reply = connV11.sendFrame(frame);
+ reply = conn.sendFrame(frame);
- assertEquals("CONNECTED", reply.getCommand());
+ assertEquals(Stomp.Responses.CONNECTED, reply.getCommand());
- assertEquals("0,500", reply.getHeader("heart-beat"));
+ assertEquals("0,500", reply.getHeader(Stomp.Headers.Connect.HEART_BEAT));
Thread.sleep(2000);
//now server side should be disconnected because we didn't send ping for 2 sec
- frame = connV11.createFrame("SEND");
- frame.addHeader("destination", getQueuePrefix() + getQueueName());
- frame.addHeader("content-type", "text/plain");
- frame.setBody("Hello World");
-
//send will fail
try {
- connV11.sendFrame(frame);
+ send(conn, getQueuePrefix() + getQueueName(), "text/plain", "Hello World");
fail("connection should have been destroyed by now");
} catch (IOException e) {
//ignore
}
//heart-beat (1,0), start a ping, then send a message, should be ok.
- connV11 = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
- frame = connV11.createFrame("CONNECT");
- frame.addHeader("host", "127.0.0.1");
- frame.addHeader("login", this.defUser);
- frame.addHeader("passcode", this.defPass);
- frame.addHeader("heart-beat", "1,0");
- frame.addHeader("accept-version", "1.0,1.1");
+ conn = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
+ frame = conn.createFrame(Stomp.Commands.CONNECT)
+ .addHeader(Stomp.Headers.Connect.HOST, "127.0.0.1")
+ .addHeader(Stomp.Headers.Connect.LOGIN, this.defUser)
+ .addHeader(Stomp.Headers.Connect.PASSCODE, this.defPass)
+ .addHeader(Stomp.Headers.Connect.HEART_BEAT, "1,0")
+ .addHeader(Stomp.Headers.Connect.ACCEPT_VERSION, "1.0,1.1");
- reply = connV11.sendFrame(frame);
+ reply = conn.sendFrame(frame);
- assertEquals("CONNECTED", reply.getCommand());
+ assertEquals(Stomp.Responses.CONNECTED, reply.getCommand());
- assertEquals("0,500", reply.getHeader("heart-beat"));
+ assertEquals("0,500", reply.getHeader(Stomp.Headers.Connect.HEART_BEAT));
- System.out.println("========== start pinger!");
+ IntegrationTestLogger.LOGGER.info("========== start pinger!");
- connV11.startPinger(500);
+ conn.startPinger(500);
Thread.sleep(2000);
//now server side should be disconnected because we didn't send ping for 2 sec
- frame = connV11.createFrame("SEND");
- frame.addHeader("destination", getQueuePrefix() + getQueueName());
- frame.addHeader("content-type", "text/plain");
- frame.setBody("Hello World");
-
//send will be ok
- connV11.sendFrame(frame);
+ send(conn, getQueuePrefix() + getQueueName(), "text/plain", "Hello World");
- connV11.stopPinger();
+ conn.stopPinger();
- connV11.disconnect();
+ conn.disconnect();
}
@@ -499,82 +449,72 @@ public class StompV11Test extends StompV11TestBase {
@Test
public void testHeartBeat2() throws Exception {
//heart-beat (1,1)
- ClientStompFrame frame = connV11.createFrame("CONNECT");
- frame.addHeader("host", "127.0.0.1");
- frame.addHeader("login", this.defUser);
- frame.addHeader("passcode", this.defPass);
- frame.addHeader("heart-beat", "1,1");
- frame.addHeader("accept-version", "1.0,1.1");
+ ClientStompFrame frame = conn.createFrame(Stomp.Commands.CONNECT)
+ .addHeader(Stomp.Headers.Connect.HOST, "127.0.0.1")
+ .addHeader(Stomp.Headers.Connect.LOGIN, this.defUser)
+ .addHeader(Stomp.Headers.Connect.PASSCODE, this.defPass)
+ .addHeader(Stomp.Headers.Connect.HEART_BEAT, "1,1")
+ .addHeader(Stomp.Headers.Connect.ACCEPT_VERSION, "1.0,1.1");
- ClientStompFrame reply = connV11.sendFrame(frame);
+ ClientStompFrame reply = conn.sendFrame(frame);
- assertEquals("CONNECTED", reply.getCommand());
- assertEquals("500,500", reply.getHeader("heart-beat"));
+ assertEquals(Stomp.Responses.CONNECTED, reply.getCommand());
+ assertEquals("500,500", reply.getHeader(Stomp.Headers.Connect.HEART_BEAT));
- connV11.disconnect();
+ conn.disconnect();
//heart-beat (500,1000)
- connV11 = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
- frame = connV11.createFrame("CONNECT");
- frame.addHeader("host", "127.0.0.1");
- frame.addHeader("login", this.defUser);
- frame.addHeader("passcode", this.defPass);
- frame.addHeader("heart-beat", "500,1000");
- frame.addHeader("accept-version", "1.0,1.1");
+ conn = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
+ frame = conn.createFrame(Stomp.Commands.CONNECT)
+ .addHeader(Stomp.Headers.Connect.HOST, "127.0.0.1")
+ .addHeader(Stomp.Headers.Connect.LOGIN, this.defUser)
+ .addHeader(Stomp.Headers.Connect.PASSCODE, this.defPass)
+ .addHeader(Stomp.Headers.Connect.HEART_BEAT, "500,1000")
+ .addHeader(Stomp.Headers.Connect.ACCEPT_VERSION, "1.0,1.1");
- reply = connV11.sendFrame(frame);
+ reply = conn.sendFrame(frame);
- assertEquals("CONNECTED", reply.getCommand());
+ assertEquals(Stomp.Responses.CONNECTED, reply.getCommand());
- assertEquals("1000,500", reply.getHeader("heart-beat"));
+ assertEquals("1000,500", reply.getHeader(Stomp.Headers.Connect.HEART_BEAT));
- System.out.println("========== start pinger!");
+ IntegrationTestLogger.LOGGER.info("========== start pinger!");
- connV11.startPinger(500);
+ conn.startPinger(500);
Thread.sleep(10000);
//now check the frame size
- int size = connV11.getServerPingNumber();
+ int size = conn.getServerPingNumber();
- System.out.println("ping received: " + size);
+ IntegrationTestLogger.LOGGER.info("ping received: " + size);
assertTrue(size > 5);
//now server side should be disconnected because we didn't send ping for 2 sec
- frame = connV11.createFrame("SEND");
- frame.addHeader("destination", getQueuePrefix() + getQueueName());
- frame.addHeader("content-type", "text/plain");
- frame.setBody("Hello World");
-
//send will be ok
- connV11.sendFrame(frame);
+ send(conn, getQueuePrefix() + getQueueName(), "text/plain", "Hello World");
- connV11.disconnect();
+ conn.disconnect();
}
@Test
public void testSendWithHeartBeatsAndReceive() throws Exception {
StompClientConnection newConn = null;
try {
- ClientStompFrame frame = connV11.createFrame("CONNECT");
- frame.addHeader("host", "127.0.0.1");
- frame.addHeader("login", this.defUser);
- frame.addHeader("passcode", this.defPass);
- frame.addHeader("heart-beat", "500,1000");
- frame.addHeader("accept-version", "1.0,1.1");
+ ClientStompFrame frame = conn.createFrame(Stomp.Commands.CONNECT)
+ .addHeader(Stomp.Headers.Connect.HOST, "127.0.0.1")
+ .addHeader(Stomp.Headers.Connect.LOGIN, this.defUser)
+ .addHeader(Stomp.Headers.Connect.PASSCODE, this.defPass)
+ .addHeader(Stomp.Headers.Connect.HEART_BEAT, "500,1000")
+ .addHeader(Stomp.Headers.Connect.ACCEPT_VERSION, "1.0,1.1");
- connV11.sendFrame(frame);
+ conn.sendFrame(frame);
- connV11.startPinger(500);
-
- frame = connV11.createFrame("SEND");
- frame.addHeader("destination", getQueuePrefix() + getQueueName());
- frame.addHeader("content-type", "text/plain");
+ conn.startPinger(500);
for (int i = 0; i < 10; i++) {
- frame.setBody("Hello World " + i + "!");
- connV11.sendFrame(frame);
+ send(conn, getQueuePrefix() + getQueueName(), "text/plain", "Hello World " + i + "!");
Thread.sleep(500);
}
@@ -582,12 +522,7 @@ public class StompV11Test extends StompV11TestBase {
newConn = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
newConn.connect(defUser, defPass);
- ClientStompFrame subFrame = newConn.createFrame("SUBSCRIBE");
- subFrame.addHeader("id", "a-sub");
- subFrame.addHeader("destination", getQueuePrefix() + getQueueName());
- subFrame.addHeader("ack", "auto");
-
- newConn.sendFrame(subFrame);
+ subscribe(newConn, "a-sub");
int cnt = 0;
@@ -602,38 +537,32 @@ public class StompV11Test extends StompV11TestBase {
assertEquals(10, cnt);
// unsub
- ClientStompFrame unsubFrame = newConn.createFrame("UNSUBSCRIBE");
- unsubFrame.addHeader("id", "a-sub");
- newConn.sendFrame(unsubFrame);
+ unsubscribe(newConn, "a-sub");
} finally {
if (newConn != null)
newConn.disconnect();
- connV11.disconnect();
+ conn.disconnect();
}
}
@Test
public void testSendAndReceiveWithHeartBeats() throws Exception {
- connV11.connect(defUser, defPass);
- ClientStompFrame frame = connV11.createFrame("SEND");
- frame.addHeader("destination", getQueuePrefix() + getQueueName());
- frame.addHeader("content-type", "text/plain");
+ conn.connect(defUser, defPass);
for (int i = 0; i < 10; i++) {
- frame.setBody("Hello World " + i + "!");
- connV11.sendFrame(frame);
+ send(conn, getQueuePrefix() + getQueueName(), "text/plain", "Hello World " + i + "!");
Thread.sleep(500);
}
//subscribe
StompClientConnection newConn = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
try {
- frame = newConn.createFrame("CONNECT");
- frame.addHeader("host", "127.0.0.1");
- frame.addHeader("login", this.defUser);
- frame.addHeader("passcode", this.defPass);
- frame.addHeader("heart-beat", "500,1000");
- frame.addHeader("accept-version", "1.0,1.1");
+ ClientStompFrame frame = newConn.createFrame(Stomp.Commands.CONNECT)
+ .addHeader(Stomp.Headers.Connect.HOST, "127.0.0.1")
+ .addHeader(Stomp.Headers.Connect.LOGIN, this.defUser)
+ .addHeader(Stomp.Headers.Connect.PASSCODE, this.defPass)
+ .addHeader(Stomp.Headers.Connect.HEART_BEAT, "500,1000")
+ .addHeader(Stomp.Headers.Connect.ACCEPT_VERSION, "1.0,1.1");
newConn.sendFrame(frame);
@@ -641,12 +570,7 @@ public class StompV11Test extends StompV11TestBase {
Thread.sleep(500);
- ClientStompFrame subFrame = newConn.createFrame("SUBSCRIBE");
- subFrame.addHeader("id", "a-sub");
- subFrame.addHeader("destination", getQueuePrefix() + getQueueName());
- subFrame.addHeader("ack", "auto");
-
- newConn.sendFrame(subFrame);
+ subscribe(newConn, "a-sub");
int cnt = 0;
@@ -661,9 +585,7 @@ public class StompV11Test extends StompV11TestBase {
assertEquals(10, cnt);
// unsub
- ClientStompFrame unsubFrame = newConn.createFrame("UNSUBSCRIBE");
- unsubFrame.addHeader("id", "a-sub");
- newConn.sendFrame(unsubFrame);
+ unsubscribe(newConn, "a-sub");
} finally {
newConn.disconnect();
}
@@ -673,35 +595,30 @@ public class StompV11Test extends StompV11TestBase {
public void testSendWithHeartBeatsAndReceiveWithHeartBeats() throws Exception {
StompClientConnection newConn = null;
try {
- ClientStompFrame frame = connV11.createFrame("CONNECT");
- frame.addHeader("host", "127.0.0.1");
- frame.addHeader("login", this.defUser);
- frame.addHeader("passcode", this.defPass);
- frame.addHeader("heart-beat", "500,1000");
- frame.addHeader("accept-version", "1.0,1.1");
+ ClientStompFrame frame = conn.createFrame(Stomp.Commands.CONNECT)
+ .addHeader(Stomp.Headers.Connect.HOST, "127.0.0.1")
+ .addHeader(Stomp.Headers.Connect.LOGIN, this.defUser)
+ .addHeader(Stomp.Headers.Connect.PASSCODE, this.defPass)
+ .addHeader(Stomp.Headers.Connect.HEART_BEAT, "500,1000")
+ .addHeader(Stomp.Headers.Connect.ACCEPT_VERSION, "1.0,1.1");
- connV11.sendFrame(frame);
+ conn.sendFrame(frame);
- connV11.startPinger(500);
-
- frame = connV11.createFrame("SEND");
- frame.addHeader("destination", getQueuePrefix() + getQueueName());
- frame.addHeader("content-type", "text/plain");
+ conn.startPinger(500);
for (int i = 0; i < 10; i++) {
- frame.setBody("Hello World " + i + "!");
- connV11.sendFrame(frame);
+ send(conn, getQueuePrefix() + getQueueName(), "text/plain", "Hello World " + i + "!");
Thread.sleep(500);
}
// subscribe
newConn = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
- frame = newConn.createFrame("CONNECT");
- frame.addHeader("host", "127.0.0.1");
- frame.addHeader("login", this.defUser);
- frame.addHeader("passcode", this.defPass);
- frame.addHeader("heart-beat", "500,1000");
- frame.addHeader("accept-version", "1.0,1.1");
+ frame = newConn.createFrame(Stomp.Commands.CONNECT)
+ .addHeader(Stomp.Headers.Connect.HOST, "127.0.0.1")
+ .addHeader(Stomp.Headers.Connect.LOGIN, this.defUser)
+ .addHeader(Stomp.Headers.Connect.PASSCODE, this.defPass)
+ .addHeader(Stomp.Headers.Connect.HEART_BEAT, "500,1000")
+ .addHeader(Stomp.Headers.Connect.ACCEPT_VERSION, "1.0,1.1");
newConn.sendFrame(frame);
@@ -709,12 +626,7 @@ public class StompV11Test extends StompV11TestBase {
Thread.sleep(500);
- ClientStompFrame subFrame = newConn.createFrame("SUBSCRIBE");
- subFrame.addHeader("id", "a-sub");
- subFrame.addHeader("destination", getQueuePrefix() + getQueueName());
- subFrame.addHeader("ack", "auto");
-
- newConn.sendFrame(subFrame);
+ subscribe(newConn, "a-sub");
int cnt = 0;
@@ -728,13 +640,11 @@ public class StompV11Test extends StompV11TestBase {
assertEquals(10, cnt);
// unsub
- ClientStompFrame unsubFrame = newConn.createFrame("UNSUBSCRIBE");
- unsubFrame.addHeader("id", "a-sub");
- newConn.sendFrame(unsubFrame);
+ unsubscribe(newConn, "a-sub");
} finally {
if (newConn != null)
newConn.disconnect();
- connV11.disconnect();
+ conn.disconnect();
}
}
@@ -748,14 +658,14 @@ public class StompV11Test extends StompV11TestBase {
StompClientConnection connection = StompClientConnectionFactory.createClientConnection("1.1", "localhost", port);
//no heart beat at all if heat-beat absent
- frame = connection.createFrame("CONNECT");
- frame.addHeader("host", "127.0.0.1");
- frame.addHeader("login", this.defUser);
- frame.addHeader("passcode", this.defPass);
+ frame = connection.createFrame(Stomp.Commands.CONNECT)
+ .addHeader(Stomp.Headers.Connect.HOST, "127.0.0.1")
+ .addHeader(Stomp.Headers.Connect.LOGIN, this.defUser)
+ .addHeader(Stomp.Headers.Connect.PASSCODE, this.defPass);
reply = connection.sendFrame(frame);
- assertEquals("CONNECTED", reply.getCommand());
+ assertEquals(Stomp.Responses.CONNECTED, reply.getCommand());
Thread.sleep(3000);
@@ -772,20 +682,20 @@ public class StompV11Test extends StompV11TestBase {
//no heart beat for (0,0)
connection = StompClientConnectionFactory.createClientConnection("1.1", "localhost", port);
- frame = connection.createFrame("CONNECT");
- frame.addHeader("host", "127.0.0.1");
- frame.addHeader("login", this.defUser);
- frame.addHeader("passcode", this.defPass);
- frame.addHeader("heart-beat", "0,0");
- frame.addHeader("accept-version", "1.0,1.1");
+ frame = connection.createFrame(Stomp.Commands.CONNECT)
+ .addHeader(Stomp.Headers.Connect.HOST, "127.0.0.1")
+ .addHeader(Stomp.Headers.Connect.LOGIN, this.defUser)
+ .addHeader(Stomp.Headers.Connect.PASSCODE, this.defPass)
+ .addHeader(Stomp.Headers.Connect.HEART_BEAT, "0,0")
+ .addHeader(Stomp.Headers.Connect.ACCEPT_VERSION, "1.0,1.1");
reply = connection.sendFrame(frame);
IntegrationTestLogger.LOGGER.info("Reply: " + reply);
- assertEquals("CONNECTED", reply.getCommand());
+ assertEquals(Stomp.Responses.CONNECTED, reply.getCommand());
- assertEquals("0,500", reply.getHeader("heart-beat"));
+ assertEquals("0,500", reply.getHeader(Stomp.Headers.Connect.HEART_BEAT));
Thread.sleep(3000);
@@ -802,30 +712,25 @@ public class StompV11Test extends StompV11TestBase {
//heart-beat (1,0), should receive a min client ping accepted by server
connection = StompClientConnectionFactory.createClientConnection("1.1", "localhost", port);
- frame = connection.createFrame("CONNECT");
- frame.addHeader("host", "127.0.0.1");
- frame.addHeader("login", this.defUser);
- frame.addHeader("passcode", this.defPass);
- frame.addHeader("heart-beat", "1,0");
- frame.addHeader("accept-version", "1.0,1.1");
+ frame = connection.createFrame(Stomp.Commands.CONNECT)
+ .addHeader(Stomp.Headers.Connect.HOST, "127.0.0.1")
+ .addHeader(Stomp.Headers.Connect.LOGIN, this.defUser)
+ .addHeader(Stomp.Headers.Connect.PASSCODE, this.defPass)
+ .addHeader(Stomp.Headers.Connect.HEART_BEAT, "1,0")
+ .addHeader(Stomp.Headers.Connect.ACCEPT_VERSION, "1.0,1.1");
reply = connection.sendFrame(frame);
- assertEquals("CONNECTED", reply.getCommand());
+ assertEquals(Stomp.Responses.CONNECTED, reply.getCommand());
- assertEquals("0,2500", reply.getHeader("heart-beat"));
+ assertEquals("0,2500", reply.getHeader(Stomp.Headers.Connect.HEART_BEAT));
Thread.sleep(7000);
//now server side should be disconnected because we didn't send ping for 2 sec
- frame = connection.createFrame("SEND");
- frame.addHeader("destination", getQueuePrefix() + getQueueName());
- frame.addHeader("content-type", "text/plain");
- frame.setBody("Hello World");
-
//send will fail
try {
- connection.sendFrame(frame);
+ send(connection, getQueuePrefix() + getQueueName(), "text/plain", "Hello World");
fail("connection should have been destroyed by now");
} catch (IOException e) {
//ignore
@@ -833,33 +738,28 @@ public class StompV11Test extends StompV11TestBase {
//heart-beat (1,0), start a ping, then send a message, should be ok.
connection = StompClientConnectionFactory.createClientConnection("1.1", "localhost", port);
- frame = connection.createFrame("CONNECT");
- frame.addHeader("host", "127.0.0.1");
- frame.addHeader("login", this.defUser);
- frame.addHeader("passcode", this.defPass);
- frame.addHeader("heart-beat", "1,0");
- frame.addHeader("accept-version", "1.0,1.1");
+ frame = connection.createFrame(Stomp.Commands.CONNECT)
+ .addHeader(Stomp.Headers.Connect.HOST, "127.0.0.1")
+ .addHeader(Stomp.Headers.Connect.LOGIN, this.defUser)
+ .addHeader(Stomp.Headers.Connect.PASSCODE, this.defPass)
+ .addHeader(Stomp.Headers.Connect.HEART_BEAT, "1,0")
+ .addHeader(Stomp.Headers.Connect.ACCEPT_VERSION, "1.0,1.1");
reply = connection.sendFrame(frame);
- assertEquals("CONNECTED", reply.getCommand());
+ assertEquals(Stomp.Responses.CONNECTED, reply.getCommand());
- assertEquals("0,2500", reply.getHeader("heart-beat"));
+ assertEquals("0,2500", reply.getHeader(Stomp.Headers.Connect.HEART_BEAT));
- System.out.println("========== start pinger!");
+ IntegrationTestLogger.LOGGER.info("========== start pinger!");
connection.startPinger(2500);
Thread.sleep(7000);
//now server side should be disconnected because we didn't send ping for 2 sec
- frame = connection.createFrame("SEND");
- frame.addHeader("destination", getQueuePrefix() + getQueueName());
- frame.addHeader("content-type", "text/plain");
- frame.setBody("Hello World");
-
//send will be ok
- connection.sendFrame(frame);
+ send(connection, getQueuePrefix() + getQueueName(), "text/plain", "Hello World");
connection.stopPinger();
@@ -867,30 +767,25 @@ public class StompV11Test extends StompV11TestBase {
//heart-beat (20000,0), should receive a max client ping accepted by server
connection = StompClientConnectionFactory.createClientConnection("1.1", "localhost", port);
- frame = connection.createFrame("CONNECT");
- frame.addHeader("host", "127.0.0.1");
- frame.addHeader("login", this.defUser);
- frame.addHeader("passcode", this.defPass);
- frame.addHeader("heart-beat", "20000,0");
- frame.addHeader("accept-version", "1.0,1.1");
+ frame = connection.createFrame(Stomp.Commands.CONNECT)
+ .addHeader(Stomp.Headers.Connect.HOST, "127.0.0.1")
+ .addHeader(Stomp.Headers.Connect.LOGIN, this.defUser)
+ .addHeader(Stomp.Headers.Connect.PASSCODE, this.defPass)
+ .addHeader(Stomp.Headers.Connect.HEART_BEAT, "20000,0")
+ .addHeader(Stomp.Headers.Connect.ACCEPT_VERSION, "1.0,1.1");
reply = connection.sendFrame(frame);
- assertEquals("CONNECTED", reply.getCommand());
+ assertEquals(Stomp.Responses.CONNECTED, reply.getCommand());
- assertEquals("0,5000", reply.getHeader("heart-beat"));
+ assertEquals("0,5000", reply.getHeader(Stomp.Headers.Connect.HEART_BEAT));
Thread.sleep(12000);
//now server side should be disconnected because we didn't send ping for 2 sec
- frame = connection.createFrame("SEND");
- frame.addHeader("destination", getQueuePrefix() + getQueueName());
- frame.addHeader("content-type", "text/plain");
- frame.setBody("Hello World");
-
//send will fail
try {
- connection.sendFrame(frame);
+ send(connection, getQueuePrefix() + getQueueName(), "text/plain", "Hello World");
fail("connection should have been destroyed by now");
} catch (IOException e) {
//ignore
@@ -907,18 +802,18 @@ public class StompV11Test extends StompV11TestBase {
server.getActiveMQServer().getRemotingService().createAcceptor("test", "tcp://127.0.0.1:" + port + "?heartBeatToConnectionTtlModifier=1").start();
connection = StompClientConnectionFactory.createClientConnection("1.1", "localhost", port);
- frame = connection.createFrame("CONNECT");
- frame.addHeader("host", "127.0.0.1");
- frame.addHeader("login", this.defUser);
- frame.addHeader("passcode", this.defPass);
- frame.addHeader("heart-beat", "5000,0");
- frame.addHeader("accept-version", "1.0,1.1");
+ frame = connection.createFrame(Stomp.Commands.CONNECT)
+ .addHeader(Stomp.Headers.Connect.HOST, "127.0.0.1")
+ .addHeader(Stomp.Headers.Connect.LOGIN, this.defUser)
+ .addHeader(Stomp.Headers.Connect.PASSCODE, this.defPass)
+ .addHeader(Stomp.Headers.Connect.HEART_BEAT, "5000,0")
+ .addHeader(Stomp.Headers.Connect.ACCEPT_VERSION, "1.0,1.1");
reply = connection.sendFrame(frame);
- assertEquals("CONNECTED", reply.getCommand());
+ assertEquals(Stomp.Responses.CONNECTED, reply.getCommand());
- assertEquals("0,5000", reply.getHeader("heart-beat"));
+ assertEquals("0,5000", reply.getHeader(Stomp.Headers.Connect.HEART_BEAT));
Thread.sleep(6000);
@@ -933,18 +828,18 @@ public class StompV11Test extends StompV11TestBase {
server.getActiveMQServer().getRemotingService().createAcceptor("test", "tcp://127.0.0.1:" + port + "?heartBeatToConnectionTtlModifier=1.5").start();
connection = StompClientConnectionFactory.createClientConnection("1.1", "localhost", port);
- frame = connection.createFrame("CONNECT");
- frame.addHeader("host", "127.0.0.1");
- frame.addHeader("login", this.defUser);
- frame.addHeader("passcode", this.defPass);
- frame.addHeader("heart-beat", "5000,0");
- frame.addHeader("accept-version", "1.0,1.1");
+ frame = connection.createFrame(Stomp.Commands.CONNECT)
+ .addHeader(Stomp.Headers.Connect.HOST, "127.0.0.1")
+ .addHeader(Stomp.Headers.Connect.LOGIN, this.defUser)
+ .addHeader(Stomp.Headers.Connect.PASSCODE, this.defPass)
+ .addHeader(Stomp.Headers.Connect.HEART_BEAT, "5000,0")
+ .addHeader(Stomp.Headers.Connect.ACCEPT_VERSION, "1.0,1.1");
reply = connection.sendFrame(frame);
- assertEquals("CONNECTED", reply.getCommand());
+ assertEquals(Stomp.Responses.CONNECTED, reply.getCommand());
- assertEquals("0,5000", reply.getHeader("heart-beat"));
+ assertEquals("0,5000", reply.getHeader(Stomp.Headers.Connect.HEART_BEAT));
Thread.sleep(6000);
@@ -953,21 +848,21 @@ public class StompV11Test extends StompV11TestBase {
@Test
public void testNack() throws Exception {
- connV11.connect(defUser, defPass);
+ conn.connect(defUser, defPass);
- subscribe(connV11, "sub1", "client");
+ subscribe(conn, "sub1", Stomp.Headers.Subscribe.AckModeValues.CLIENT);
- sendMessage(getName());
+ sendJmsMessage(getName());
- ClientStompFrame frame = connV11.receiveFrame();
+ ClientStompFrame frame = conn.receiveFrame();
- String messageID = frame.getHeader("message-id");
+ String messageID = frame.getHeader(Stomp.Headers.Message.MESSAGE_ID);
- nack(connV11, "sub1", messageID);
+ nack(conn, "sub1", messageID);
- unsubscribe(connV11, "sub1");
+ unsubscribe(conn, "sub1");
- connV11.disconnect();
+ conn.disconnect();
//Nack makes the message be dropped.
MessageConsumer consumer = session.createConsumer(queue);
@@ -977,25 +872,25 @@ public class StompV11Test extends StompV11TestBase {
@Test
public void testNackWithWrongSubId() throws Exception {
- connV11.connect(defUser, defPass);
+ conn.connect(defUser, defPass);
- subscribe(connV11, "sub1", "client");
+ subscribe(conn, "sub1", Stomp.Headers.Subscribe.AckModeValues.CLIENT);
- sendMessage(getName());
+ sendJmsMessage(getName());
- ClientStompFrame frame = connV11.receiveFrame();
+ ClientStompFrame frame = conn.receiveFrame();
- String messageID = frame.getHeader("message-id");
+ String messageID = frame.getHeader(Stomp.Headers.Message.MESSAGE_ID);
- nack(connV11, "sub2", messageID);
+ nack(conn, "sub2", messageID);
- ClientStompFrame error = connV11.receiveFrame();
+ ClientStompFrame error = conn.receiveFrame();
- System.out.println("Receiver error: " + error);
+ IntegrationTestLogger.LOGGER.info("Receiver error: " + error);
- unsubscribe(connV11, "sub1");
+ unsubscribe(conn, "sub1");
- connV11.disconnect();
+ conn.disconnect();
//message should be still there
MessageConsumer consumer = session.createConsumer(queue);
@@ -1005,25 +900,25 @@ public class StompV11Test extends StompV11TestBase {
@Test
public void testNackWithWrongMessageId() throws Exception {
- connV11.connect(defUser, defPass);
+ conn.connect(defUser, defPass);
- subscribe(connV11, "sub1", "client");
+ subscribe(conn, "sub1", Stomp.Headers.Subscribe.AckModeValues.CLIENT);
- sendMessage(getName());
+ sendJmsMessage(getName());
- ClientStompFrame frame = connV11.receiveFrame();
+ ClientStompFrame frame = conn.receiveFrame();
- frame.getHeader("message-id");
+ frame.getHeader(Stomp.Headers.Message.MESSAGE_ID);
- nack(connV11, "sub2", "someother");
+ nack(conn, "sub2", "someother");
- ClientStompFrame error = connV11.receiveFrame();
+ ClientStompFrame error = conn.receiveFrame();
- System.out.println("Receiver error: " + error);
+ IntegrationTestLogger.LOGGER.info("Receiver error: " + error);
- unsubscribe(connV11, "sub1");
+ unsubscribe(conn, "sub1");
- connV11.disconnect();
+ conn.disconnect();
//message should still there
MessageConsumer consumer = session.createConsumer(queue);
@@ -1033,21 +928,21 @@ public class StompV11Test extends StompV11TestBase {
@Test
public void testAck() throws Exception {
- connV11.connect(defUser, defPass);
+ conn.connect(defUser, defPass);
- subscribe(connV11, "sub1", "client");
+ subscribe(conn, "sub1", Stomp.Headers.Subscribe.AckModeValues.CLIENT);
- sendMessage(getName());
+ sendJmsMessage(getName());
- ClientStompFrame frame = connV11.receiveFrame();
+ ClientStompFrame frame = conn.receiveFrame();
- String messageID = frame.getHeader("message-id");
+ String messageID = frame.getHeader(Stomp.Headers.Message.MESSAGE_ID);
- ack(connV11, "sub1", messageID, null);
+ ack(conn, "sub1", messageID, null);
- unsubscribe(connV11, "sub1");
+ unsubscribe(conn, "sub1");
- connV11.disconnect();
+ conn.disconnect();
//Nack makes the message be dropped.
MessageConsumer consumer = session.createConsumer(queue);
@@ -1057,25 +952,25 @@ public class StompV11Test extends StompV11TestBase {
@Test
public void testAckWithWrongSubId() throws Exception {
- connV11.connect(defUser, defPass);
+ conn.connect(defUser, defPass);
- subscribe(connV11, "sub1", "client");
+ subscribe(conn, "sub1", Stomp.Headers.Subscribe.AckModeValues.CLIENT);
- sendMessage(getName());
+ sendJmsMessage(getName());
- ClientStompFrame frame = connV11.receiveFrame();
+ ClientStompFrame frame = conn.receiveFrame();
- String messageID = frame.getHeader("message-id");
+ String messageID = frame.getHeader(Stomp.Headers.Message.MESSAGE_ID);
- ack(connV11, "sub2", messageID, null);
+ ack(conn, "sub2", messageID, null);
- ClientStompFrame error = connV11.receiveFrame();
+ ClientStompFrame error = conn.receiveFrame();
- System.out.println("Receiver error: " + error);
+ IntegrationTestLogger.LOGGER.info("Receiver error: " + error);
- unsubscribe(connV11, "sub1");
+ unsubscribe(conn, "sub1");
- connV11.disconnect();
+ conn.disconnect();
//message should be still there
MessageConsumer consumer = session.createConsumer(queue);
@@ -1085,25 +980,25 @@ public class StompV11Test extends StompV11TestBase {
@Test
public void testAckWithWrongMessageId() throws Exception {
- connV11.connect(defUser, defPass);
+ conn.connect(defUser, defPass);
- subscribe(connV11, "sub1", "client");
+ subscribe(conn, "sub1", Stomp.Headers.Subscribe.AckModeValues.CLIENT);
- sendMessage(getName());
+ sendJmsMessage(getName());
- ClientStompFrame frame = connV11.receiveFrame();
+ ClientStompFrame frame = conn.receiveFrame();
- frame.getHeader("message-id");
+ frame.getHeader(Stomp.Headers.Message.MESSAGE_ID);
- ack(connV11, "sub2", "someother", null);
+ ack(conn, "sub2", "someother", null);
- ClientStompFrame error = connV11.receiveFrame();
+ ClientStompFrame error = conn.receiveFrame();
- System.out.println("Receiver error: " + error);
+ IntegrationTestLogger.LOGGER.info("Receiver error: " + error);
- unsubscribe(connV11, "sub1");
+ unsubscribe(conn, "sub1");
- connV11.disconnect();
+ conn.disconnect();
//message should still there
MessageConsumer consumer = session.createConsumer(queue);
@@ -1113,33 +1008,33 @@ public class StompV11Test extends StompV11TestBase {
@Test
public void testErrorWithReceipt() throws Exception {
- connV11.connect(defUser, defPass);
+ conn.connect(defUser, defPass);
- subscribe(connV11, "sub1", "client");
+ subscribe(conn, "sub1", Stomp.Headers.Subscribe.AckModeValues.CLIENT);
- sendMessage(getName());
+ sendJmsMessage(getName());
- ClientStompFrame frame = connV11.receiveFrame();
+ ClientStompFrame frame = conn.receiveFrame();
- String messageID = frame.getHeader("message-id");
+ String messageID = frame.getHeader(Stomp.Headers.Message.MESSAGE_ID);
- ClientStompFrame ackFrame = connV11.createFrame("ACK");
//give it a wrong sub id
- ackFrame.addHeader("subscription", "sub2");
- ackFrame.addHeader("message-id", messageID);
- ackFrame.addHeader("receipt", "answer-me");
+ ClientStompFrame ackFrame = conn.createFrame(Stomp.Commands.ACK)
+ .addHeader(Stomp.Headers.Ack.SUBSCRIPTION, "sub2")
+ .addHeader(Stomp.Headers.Message.MESSAGE_ID, messageID)
+ .addHeader(Stomp.Headers.RECEIPT_REQUESTED, "answer-me");
- ClientStompFrame error = connV11.sendFrame(ackFrame);
+ ClientStompFrame error = conn.sendFrame(ackFrame);
- System.out.println("Receiver error: " + error);
+ IntegrationTestLogger.LOGGER.info("Receiver error: " + error);
- assertEquals("ERROR", error.getCommand());
+ assertEquals(Stomp.Responses.ERROR, error.getCommand());
- assertEquals("answer-me", error.getHeader("receipt-id"));
+ assertEquals("answer-me", error.getHeader(Stomp.Headers.Response.RECEIPT_ID));
- unsubscribe(connV11, "sub1");
+ unsubscribe(conn, "sub1");
- connV11.disconnect();
+ conn.disconnect();
//message should still there
MessageConsumer consumer = session.createConsumer(queue);
@@ -1149,33 +1044,33 @@ public class StompV11Test extends StompV11TestBase {
@Test
public void testErrorWithReceipt2() throws Exception {
- connV11.connect(defUser, defPass);
+ conn.connect(defUser, defPass);
- subscribe(connV11, "sub1", "client");
+ subscribe(conn, "sub1", Stomp.Headers.Subscribe.AckModeValues.CLIENT);
- sendMessage(getName());
+ sendJmsMessage(getName());
- ClientStompFrame frame = connV11.receiveFrame();
+ ClientStompFrame frame = conn.receiveFrame();
- String messageID = frame.getHeader("message-id");
+ String messageID = frame.getHeader(Stomp.Headers.Message.MESSAGE_ID);
- ClientStompFrame ackFrame = connV11.createFrame("ACK");
//give it a wrong sub id
- ackFrame.addHeader("subscription", "sub1");
- ackFrame.addHeader("message-id", String.valueOf(Long.valueOf(messageID) + 1));
- ackFrame.addHeader("receipt", "answer-me");
+ ClientStompFrame ackFrame = conn.createFrame(Stomp.Commands.ACK)
+ .addHeader(Stomp.Headers.Ack.SUBSCRIPTION, "sub1")
+ .addHeader(Stomp.Headers.Message.MESSAGE_ID, String.valueOf(Long.valueOf(messageID) + 1))
+ .addHeader(Stomp.Headers.RECEIPT_REQUESTED, "answer-me");
- ClientStompFrame error = connV11.sendFrame(ackFrame);
+ ClientStompFrame error = conn.sendFrame(ackFrame);
- System.out.println("Receiver error: " + error);
+ IntegrationTestLogger.LOGGER.info("Receiver error: " + error);
- assertEquals("ERROR", error.getCommand());
+ assertEquals(Stomp.Responses.ERROR, error.getCommand());
- assertEquals("answer-me", error.getHeader("receipt-id"));
+ assertEquals("answer-me", error.getHeader(Stomp.Headers.Response.RECEIPT_ID));
- unsubscribe(connV11, "sub1");
+ unsubscribe(conn, "sub1");
- connV11.disconnect();
+ conn.disconnect();
//message should still there
MessageConsumer consumer = session.createConsumer(queue);
@@ -1185,29 +1080,29 @@ public class StompV11Test extends StompV11TestBase {
@Test
public void testAckModeClient() throws Exception {
- connV11.connect(defUser, defPass);
+ conn.connect(defUser, defPass);
- subscribe(connV11, "sub1", "client");
+ subscribe(conn, "sub1", Stomp.Headers.Subscribe.AckModeValues.CLIENT);
int num = 50;
//send a bunch of messages
for (int i = 0; i < num; i++) {
- this.sendMessage("client-ack" + i);
+ this.sendJmsMessage("client-ack" + i);
}
ClientStompFrame frame = null;
for (int i = 0; i < num; i++) {
- frame = connV11.receiveFrame();
+ frame = conn.receiveFrame();
assertNotNull(frame);
}
//ack the last
- this.ack(connV11, "sub1", frame);
+ this.ack(conn, "sub1", frame);
- unsubscribe(connV11, "sub1");
+ unsubscribe(conn, "sub1");
- connV11.disconnect();
+ conn.disconnect();
//no messages can be received.
MessageConsumer consumer = session.createConsumer(queue);
@@ -1217,31 +1112,31 @@ public class StompV11Test extends StompV11TestBase {
@Test
public void testAckModeClient2() throws Exception {
- connV11.connect(defUser, defPass);
+ conn.connect(defUser, defPass);
- subscribe(connV11, "sub1", "client");
+ subscribe(conn, "sub1", Stomp.Headers.Subscribe.AckModeValues.CLIENT);
int num = 50;
//send a bunch of messages
for (int i = 0; i < num; i++) {
- this.sendMessage("client-ack" + i);
+ this.sendJmsMessage("client-ack" + i);
}
ClientStompFrame frame = null;
for (int i = 0; i < num; i++) {
- frame = connV11.receiveFrame();
+ frame = conn.receiveFrame();
assertNotNull(frame);
//ack the 49th
if (i == num - 2) {
- this.ack(connV11, "sub1", frame);
+ this.ack(conn, "sub1", frame);
}
}
- unsubscribe(connV11, "sub1");
+ unsubscribe(conn, "sub1");
- connV11.disconnect();
+ conn.disconnect();
//no messages can be received.
MessageConsumer consumer = session.createConsumer(queue);
@@ -1253,26 +1148,26 @@ public class StompV11Test extends StompV11TestBase {
@Test
public void testAckModeAuto() throws Exception {
- connV11.connect(defUser, defPass);
+ conn.connect(defUser, defPass);
- subscribe(connV11, "sub1", "auto");
+ subscribe(conn, "sub1", Stomp.Headers.Subscribe.AckModeValues.AUTO);
int num = 50;
//send a bunch of messages
for (int i = 0; i < num; i++) {
- this.sendMessage("auto-ack" + i);
+ this.sendJmsMessage("auto-ack" + i);
}
ClientStompFrame frame = null;
for (int i = 0; i < num; i++) {
- frame = connV11.receiveFrame();
+ frame = conn.receiveFrame();
assertNotNull(frame);
}
- unsubscribe(connV11, "sub1");
+ unsubscribe(conn, "sub1");
- connV11.disconnect();
+ conn.disconnect();
//no messages can be received.
MessageConsumer consumer = session.createConsumer(queue);
@@ -1282,32 +1177,32 @@ public class StompV11Test extends StompV11TestBase {
@Test
public void testAckModeClientIndividual() throws Exception {
- connV11.connect(defUser, defPass);
+ conn.connect(defUser, defPass);
- subscribe(connV11, "sub1", "client-individual");
+ subscribe(conn, "sub1", "client-individual");
int num = 50;
//send a bunch of messages
for (int i = 0; i < num; i++) {
- this.sendMessage("client-individual-ack" + i);
+ this.sendJmsMessage("client-individual-ack" + i);
}
ClientStompFrame frame = null;
for (int i = 0; i < num; i++) {
- frame = connV11.receiveFrame();
+ frame = conn.receiveFrame();
assertNotNull(frame);
- System.out.println(i + " == received: " + frame);
+ IntegrationTestLogger.LOGGER.info(i + " == received: " + frame);
//ack on even numbers
if (i % 2 == 0) {
- this.ack(connV11, "sub1", frame);
+ this.ack(conn, "sub1", frame);
}
}
- unsubscribe(connV11, "sub1");
+ unsubscribe(conn, "sub1");
- connV11.disconnect();
+ conn.disconnect();
//no messages can be received.
MessageConsumer consumer = session.createConsumer(queue);
@@ -1316,7 +1211,7 @@ public class StompV11Test extends StompV11TestBase {
for (int i = 0; i < num / 2; i++) {
message = (TextMessage) consumer.receive(1000);
Assert.assertNotNull(message);
- System.out.println("Legal: " + message.getText());
+ IntegrationTestLogger.LOGGER.info("Legal: " + message.getText());
}
message = (TextMessage) consumer.receive(1000);
@@ -1326,64 +1221,55 @@ public class StompV11Test extends StompV11TestBase {
@Test
public void testTwoSubscribers() throws Exception {
- connV11.connect(defUser, defPass, CLIENT_ID);
+ conn.connect(defUser, defPass, CLIENT_ID);
- this.subscribeTopic(connV11, "sub1", "auto", null);
+ this.subscribeTopic(conn, "sub1", Stomp.Headers.Subscribe.AckModeValues.AUTO, null);
StompClientConnection newConn = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
newConn.connect(defUser, defPass, "myclientid2");
- this.subscribeTopic(newConn, "sub2", "auto", null);
-
- ClientStompFrame frame = connV11.createFrame("SEND");
- frame.addHeader("destination", getTopicPrefix() + getTopicName());
-
- frame.setBody("Hello World");
+ this.subscribeTopic(newConn, "sub2", Stomp.Headers.Subscribe.AckModeValues.AUTO, null);
- connV11.sendFrame(frame);
+ send(conn, getTopicPrefix() + getTopicName(), null, "Hello World");
// receive message from socket
- frame = connV11.receiveFrame(1000);
+ ClientStompFrame frame = conn.receiveFrame(1000);
- System.out.println("received frame : " + frame);
+ IntegrationTestLogger.LOGGER.info("received frame : " + frame);
assertEquals("Hello World", frame.getBody());
- assertEquals("sub1", frame.getHeader("subscription"));
+ assertEquals("sub1", frame.getHeader(Stomp.Headers.Message.SUBSCRIPTION));
frame = newConn.receiveFrame(1000);
- System.out.println("received 2 frame : " + frame);
+ IntegrationTestLogger.LOGGER.info("received 2 frame : " + frame);
assertEquals("Hello World", frame.getBody());
- assertEquals("sub2", frame.getHeader("subscription"));
+ assertEquals("sub2", frame.getHeader(Stomp.Headers.Message.SUBSCRIPTION));
// remove suscription
- this.unsubscribe(connV11, "sub1", true);
+ this.unsubscribe(conn, "sub1", true);
this.unsubscribe(newConn, "sub2", true);
- connV11.disconnect();
+ conn.disconnect();
newConn.disconnect();
}
@Test
public void testSendAndReceiveOnDifferentConnections() throws Exception {
- connV11.connect(defUser, defPass);
-
- ClientStompFrame sendFrame = connV11.createFrame("SEND");
- sendFrame.addHeader("destination", getQueuePrefix() + getQueueName());
- sendFrame.setBody("Hello World");
+ conn.connect(defUser, defPass);
- connV11.sendFrame(sendFrame);
+ send(conn, getQueuePrefix() + getQueueName(), null, "Hello World");
StompClientConnection connV11_2 = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
connV11_2.connect(defUser, defPass);
- this.subscribe(connV11_2, "sub1", "auto");
+ this.subscribe(connV11_2, "sub1", Stomp.Headers.Subscribe.AckModeValues.AUTO);
ClientStompFrame frame = connV11_2.receiveFrame(2000);
- assertEquals("MESSAGE", frame.getCommand());
+ assertEquals(Stomp.Responses.MESSAGE, frame.getCommand());
assertEquals("Hello World", frame.getBody());
- connV11.disconnect();
+ conn.disconnect();
connV11_2.disconnect();
}
@@ -1391,79 +1277,81 @@ public class StompV11Test extends StompV11TestBase {
@Test
public void testBeginSameTransactionTwice() throws Exception {
- connV11.connect(defUser, defPass);
+ conn.connect(defUser, defPass);
- beginTransaction(connV11, "tx1");
+ beginTransaction(conn, "tx1");
- beginTransaction(connV11, "tx1");
+ beginTransaction(conn, "tx1");
- ClientStompFrame f = connV11.receiveFrame();
- Assert.assertTrue(f.getCommand().equals("ERROR"));
+ ClientStompFrame f = conn.receiveFrame();
+ Assert.assertTrue(f.getCommand().equals(Stomp.Responses.ERROR));
}
@Test
public void testBodyWithUTF8() throws Exception {
- connV11.connect(defUser, defPass);
+ conn.connect(defUser, defPass);
- this.subscribe(connV11, getName(), "auto");
+ this.subscribe(conn, getName(), Stomp.Headers.Subscribe.AckModeValues.AUTO);
String text = "A" + "\u00ea" + "\u00f1" + "\u00fc" + "C";
- System.out.println(text);
- sendMessage(text);
+ IntegrationTestLogger.LOGGER.info(text);
+ sendJmsMessage(text);
- ClientStompFrame frame = connV11.receiveFrame();
- System.out.println(frame);
- Assert.assertTrue(frame.getCommand().equals("MESSAGE"));
- Assert.assertNotNull(frame.getHeader("destination"));
+ ClientStompFrame frame = conn.receiveFrame();
+ IntegrationTestLogger.LOGGER.info(frame);
+ Assert.assertTrue(frame.getCommand().equals(Stomp.Responses.MESSAGE));
+ Assert.assertNotNull(frame.getHeader(Stomp.Headers.Message.DESTINATION));
Assert.assertTrue(frame.getBody().equals(text));
- connV11.disconnect();
+ conn.disconnect();
}
@Test
public void testClientAckNotPartOfTransaction() throws Exception {
- connV11.connect(defUser, defPass);
+ conn.connect(defUser, defPass);
- this.subscribe(connV11, getName(), "client");
+ this.subscribe(conn, getName(), Stomp.Headers.Subscribe.AckModeValues.CLIENT);
- sendMessage(getName());
+ sendJmsMessage(getName());
- ClientStompFrame frame = connV11.receiveFrame();
+ ClientStompFrame frame = conn.receiveFrame();
- Assert.assertTrue(frame.getCommand().equals("MESSAGE"));
- Assert.assertNotNull(frame.getHeader("destination"));
+ Assert.assertTrue(frame.getCommand().equals(Stomp.Responses.MESSAGE));
+ Assert.assertNotNull(frame.getHeader(Stomp.Headers.Message.DESTINATION));
Assert.assertTrue(frame.getBody().equals(getName()));
- Assert.assertNotNull(frame.getHeader("message-id"));
+ Assert.assertNotNull(frame.getHeader(Stomp.Headers.Message.MESSAGE_ID));
- String messageID = frame.getHeader("message-id");
+ String messageID = frame.getHeader(Stomp.Headers.Message.MESSAGE_ID);
- beginTransaction(connV11, "tx1");
+ beginTransaction(conn, "tx1");
- this.ack(connV11, getName(), messageID, "tx1");
+ this.ack(conn, getName(), messageID, "tx1");
- abortTransaction(connV11, "tx1");
+ abortTransaction(conn, "tx1");
- frame = connV11.receiveFrame(500);
+ frame = conn.receiveFrame(500);
assertNull(frame);
- this.unsubscribe(connV11, getName());
+ this.unsubscribe(conn, getName());
- connV11.disconnect();
+ conn.disconnect();
}
@Test
public void testDisconnectAndError() throws Exception {
- connV11.connect(defUser, defPass);
+ conn.connect(defUser, defPass);
+
+ this.subscribe(conn, getName(), Stomp.Headers.Subscribe.AckModeValues.CLIENT);
- this.subscribe(connV11, getName(), "client");
+ String uuid = UUID.randomUUID().toString();
- ClientStompFrame frame = connV11.createFrame("DISCONNECT");
- frame.addHeader("receipt", "1");
+ ClientStompFrame frame = conn.createFrame(Stomp.Commands.DISCONNECT)
+ .addHeader(Stomp.Headers.RECEIPT_REQUESTED, uuid);
- ClientStompFrame result = connV11.sendFrame(frame);
+ ClientStompFrame result = conn.sendFrame(frame);
- if (result == null || (!"RECEIPT".equals(result.getCommand())) || (!"1".equals(result.getHeader("receipt-id")))) {
+ if (result == null || (!Stomp.Responses.RECEIPT.equals(result.getCommand())) || (!uuid.equals(result.getHeader(Stomp.Headers.Response.RECEIPT_ID)))) {
fail("Disconnect failed! " + result);
}
@@ -1472,12 +1360,9 @@ public class StompV11Test extends StompV11TestBase {
Thread thr = new Thread() {
@Override
public void run() {
- ClientStompFrame sendFrame = connV11.createFrame("SEND");
- sendFrame.addHeader("destination", getQueuePrefix() + getQueueName());
- sendFrame.setBody("Hello World");
while (latch.getCount() != 0) {
try {
- connV11.sendFrame(sendFrame);
+ send(conn, getQueuePrefix() + getQueueName(), null, "Hello World");
Thread.sleep(500);
} catch (InterruptedException e) {
//retry
@@ -1490,7 +1375,7 @@ public class StompV11Test extends StompV11TestBase {
latch.countDown();
break;
} finally {
- connV11.destroy();
+ conn.destroy();
}
}
}
@@ -1510,66 +1395,68 @@ public class StompV11Test extends StompV11TestBase {
@Test
public void testDurableSubscriber() throws Exception {
- connV11.connect(defUser, defPass);
+ conn.connect(defUser, defPass);
- this.subscribe(connV11, "sub1", "client", getName());
+ this.subscribe(conn, "sub1", Stomp.Headers.Subscribe.AckModeValues.CLIENT, getName());
- this.subscribe(connV11, "sub1", "client", getName());
+ this.subscribe(conn, "sub1", Stomp.Headers.Subscribe.AckModeValues.CLIENT, getName());
- ClientStompFrame frame = connV11.receiveFrame();
- Assert.assertTrue(frame.getCommand().equals("ERROR"));
+ ClientStompFrame frame = conn.receiveFrame();
+ Assert.assertTrue(frame.getCommand().equals(Stomp.Responses.ERROR));
- connV11.disconnect();
+ conn.disconnect();
}
@Test
public void testDurableSubscriberWithReconnection() throws Exception {
- connV11.connect(defUser, defPass, CLIENT_ID);
+ conn.connect(defUser, defPass, CLIENT_ID);
+
+ this.subscribeTopic(conn, "sub1", Stomp.Headers.Subscribe.AckModeValues.AUTO, getName());
- this.subscribeTopic(connV11, "sub1", "auto", getName());
+ String uuid = UUID.randomUUID().toString();
- ClientStompFrame frame = connV11.createFrame("DISCONNECT");
- frame.addHeader("receipt", "1");
+ ClientStompFrame frame = conn.createFrame(Stomp.Commands.DISCONNECT)
+ .addHeader(Stomp.Headers.RECEIPT_REQUESTED, uuid);
- ClientStompFrame result = connV11.sendFrame(frame);
+ ClientStompFrame result = conn.sendFrame(frame);
- if (result == null || (!"RECEIPT".equals(result.getCommand())) || (!"1".equals(result.getHeader("receipt-id")))) {
+ if (result == null || (!Stomp.Responses.RECEIPT.equals(result.getCommand())) || (!uuid.equals(result.getHeader(Stomp.Headers.Response.RECEIPT_ID)))) {
fail("Disconnect failed! " + result);
}
// send the message when the durable subscriber is disconnected
- sendMessage(getName(), topic);
+ sendJmsMessage(getName(), topic);
- connV11.destroy();
- connV11 = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
- connV11.connect(defUser, defPass, CLIENT_ID);
+ conn.destroy();
+ conn = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
+ conn.connect(defUser, defPass, CLIENT_ID);
- this.subscribeTopic(connV11, "sub1", "auto", getName());
+ this.subscribeTopic(conn, "sub1", Stomp.Headers.Subscribe.AckModeValues.AUTO, getName());
// we must have received the message
- frame = connV11.receiveFrame();
+ frame = conn.receiveFrame();
- Assert.assertTrue(frame.getCommand().equals("MESSAGE"));
- Assert.assertNotNull(frame.getHeader("destination"));
+ Assert.assertTrue(frame.getCommand().equals(Stomp.Responses.MESSAGE));
+ Assert.assertNotNull(frame.getHeader(Stomp.Headers.Message.DESTINATION));
Assert.assertEquals(getName(), frame.getBody());
- this.unsubscribe(connV11, "sub1");
+ this.unsubscribe(conn, "sub1");
- connV11.disconnect();
+ conn.disconnect();
}
@Test
public void testDurableUnSubscribe() throws Exception {
- connV11.connect(defUser, defPass, CLIENT_ID);
+ conn.connect(defUser, defPass, CLIENT_ID);
- this.subscribeTopic(connV11, null, "auto", getName());
+ this.subscribeTopic(conn, null, Stomp.Headers.Subscribe.AckModeValues.AUTO, getName());
- connV11.disconnect();
- connV11.destroy();
- connV11 = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
- connV11.connect(defUser, defPass, CLIENT_ID);
+ conn.disconnect();
+ conn.destroy();
+ conn = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
+ conn.connect(defUser, defPass, CLIENT_ID);
- this.unsubscribe(connV11, getName(), false, true);
+ this.unsubscribe(conn, getName(), null, false, true);
long start = System.currentTimeMillis();
SimpleString queueName = SimpleString.toSimpleString(CLIENT_ID + "." + getName());
@@ -1579,21 +1466,21 @@ public class StompV11Test extends StompV11TestBase {
assertNull(server.getActiveMQServer().locateQueue(queueName));
- connV11.disconnect();
+ conn.disconnect();
}
@Test
public void testJMSXGroupIdCanBeSet() throws Exception {
MessageConsumer consumer = session.createConsumer(queue);
- connV11.connect(defUser, defPass);
+ conn.connect(defUser, defPass);
- ClientStompFrame frame = connV11.createFrame("SEND");
- frame.addHeader("destination", getQueuePrefix() + getQueueName());
- frame.addHeader("JMSXGroupID", "TEST");
- frame.setBody("Hello World");
+ ClientStompFrame frame = conn.createFrame(Stomp.Commands.SEND)
+ .addHeader(Stomp.Headers.Send.DESTINATION, getQueuePrefix() + getQueueName())
+ .addHeader("JMSXGroupID", "TEST")
+ .setBody("Hello World");
- connV11.sendFrame(frame);
+ conn.sendFrame(frame);
TextMessage message = (TextMessage) consumer.receive(1000);
Assert.assertNotNull(message);
@@ -1607,64 +1494,64 @@ public class StompV11Test extends StompV11TestBase {
int ctr = 10;
String[] data = new String[ctr];
- connV11.connect(defUser, defPass);
+ conn.connect(defUser, defPass);
- this.subscribe(connV11, "sub1", "auto");
+ this.subscribe(conn, "sub1", Stomp.Headers.Subscribe.AckModeValues.AUTO);
for (int i = 0; i < ctr; ++i) {
data[i] = getName() + i;
- sendMessage(data[i]);
+ sendJmsMessage(data[i]);
}
ClientStompFrame frame = null;
for (int i = 0; i < ctr; ++i) {
- frame = connV11.receiveFrame();
+ frame = conn.receiveFrame();
Assert.assertTrue("Message not in order", frame.getBody().equals(data[i]));
}
for (int i = 0; i < ctr; ++i) {
data[i] = getName() + ":second:" + i;
- sendMessage(data[i]);
+ sendJmsMessage(data[i]);
}
for (int i = 0; i < ctr; ++i) {
- frame = connV11.receiveFrame();
+ frame = conn.receiveFrame();
Assert.assertTrue("Message not in order", frame.getBody().equals(data[i]));
}
- connV11.disconnect();
+ conn.disconnect();
}
@Test
public void testSubscribeWithAutoAckAndSelector() throws Exception {
- connV11.connect(defUser, defPass);
+ conn.connect(defUser, defPass);
- this.subscribe(connV11, "sub1", "auto", null, "foo = 'zzz'");
+ this.subscribe(conn, "sub1", Stomp.Headers.Subscribe.AckModeValues.AUTO, null, "foo = 'zzz'");
- sendMessage("Ignored message", "foo", "1234");
- sendMessage("Real message", "foo", "zzz");
+ sendJmsMessage("Ignored message", "foo", "1234");
+ sendJmsMessage("Real message", "foo", "zzz");
- ClientStompFrame frame = connV11.receiveFrame();
+ ClientStompFrame frame = conn.receiveFrame();
Assert.assertTrue("Should have received the real message but got: " + frame, frame.getBody().equals("Real message"));
- connV11.disconnect();
+ conn.disconnect();
}
@Test
public void testRedeliveryWithClientAck() throws Exception {
- connV11.connect(defUser, defPass);
+ conn.connect(defUser, defPass);
- this.subscribe(connV11, "subId", "client");
+ this.subscribe(conn, "subscriptionId", Stomp.Headers.Subscribe.AckModeValues.CLIENT);
- sendMessage(getName());
+ sendJmsMessage(getName());
- ClientStompFrame frame = connV11.receiveFrame();
+ ClientStompFrame frame = conn.receiveFrame();
- assertTrue(frame.getCommand().equals("MESSAGE"));
+ assertTrue(frame.getCommand().equals(Stomp.Responses.MESSAGE));
- connV11.disconnect();
+ conn.disconnect();
// message should be received since message was not acknowledged
MessageConsumer consumer = session.createConsumer(queue);
@@ -1677,7 +1564,7 @@ public class StompV11Test extends StompV11TestBase {
public void testSendManyMessages() throws Exception {
MessageConsumer consumer = session.createConsumer(queue);
- connV11.connect(defUser, defPass);
+ conn.connect(defUser, defPass);
int count = 1000;
final CountDownLatch latch = new CountDownLatch(count);
@@ -1688,30 +1575,22 @@ public class StompV11Test extends StompV11TestBase {
}
});
- ClientStompFrame frame = connV11.createFrame("SEND");
- frame.addHeader("destination", getQueuePrefix() + getQueueName());
- frame.setBody("Hello World");
-
for (int i = 1; i <= count; i++) {
- connV11.sendFrame(frame);
+ send(conn, getQueuePrefix() + getQueueName(), null, "Hello World");
}
assertTrue(latch.await(60, TimeUnit.SECONDS));
- connV11.disconnect();
+ conn.disconnect();
}
@Test
public void testSendMessage() throws Exception {
MessageConsumer consumer = session.createConsumer(queue);
- connV11.connect(defUser, defPass);
-
- ClientStompFrame frame = connV11.createFrame("SEND");
- frame.addHeader("destination", getQueuePrefix() + getQueueName());
- frame.setBody("Hello World");
+ conn.connect(defUser, defPass);
- connV11.sendFrame(frame);
+ send(conn, getQueuePrefix() + getQueueName(), null, "Hello World");
TextMessage message = (TextMessage) consumer.receive(1000);
Assert.assertNotNull(message);
@@ -1730,18 +1609,16 @@ public class StompV11Test extends StompV11TestBase {
public void testSendMessageWithContentLength() throws Exception {
MessageConsumer consumer = session.createConsumer(queue);
- connV11.connect(defUser, defPass);
+ conn.connect(defUser, defPass);
byte[] data = new byte[]{1, 0, 0, 4};
- ClientStompFrame frame = connV11.createFrame("SEND");
-
- frame.addHeader("destination", getQueuePrefix() + getQueueName());
- frame.setBody(new String(data, StandardCharsets.UTF_8));
-
- frame.addHeader("content-length", String.valueOf(data.length));
+ ClientStompFrame frame = conn.createFrame(Stomp.Commands.SEND)
+ .addHeader(Stomp.Headers.Send.DESTINATION, getQueuePrefix() + getQueueName())
+ .setBody(new String(data, StandardCharsets.UTF_8))
+ .addHeader(Stomp.Headers.CONTENT_LENGTH, String.valueOf(data.length));
- connV11.sendFrame(frame);
+ conn.sendFrame(frame);
BytesMessage message = (BytesMessage) consumer.receive(10000);
Assert.assertNotNull(message);
@@ -1757,16 +1634,15 @@ public class StompV11Test extends StompV11TestBase {
public void testSendMessageWithCustomHeadersAndSelector() throws Exception {
MessageConsumer consumer = session.createConsumer(queue, "foo = 'abc'");
- connV11.connect(defUser, defPass);
+ conn.connect(defUser, defPass);
- ClientStompFrame frame = connV11.createFrame("SEND");
- frame.addHeader("foo", "abc");
- frame.addHeader("bar", "123");
+ ClientStompFrame frame = conn.createFrame(Stomp.Commands.SEND)
+ .addHeader("foo", "abc")
+ .addHeader("bar", "123")
+ .addHeader(Stomp.Headers.Send.DESTINATION, getQueuePrefix() + getQueueName())
+ .setBody("Hello World");
- frame.addHeader("destination", getQueuePrefix() + getQueueName());
- frame.setBody("Hello World");
-
- connV11.sendFrame(frame);
+ conn.sendFrame(frame);
TextMessage message = (TextMessage) consumer.receive(1000);
Assert.assertNotNull(message);
@@ -1779,14 +1655,13 @@ public class StompV11Test extends StompV11TestBase {
public void testSendMessageWithLeadingNewLine() throws Exception {
MessageConsumer consumer = session.createConsumer(queue);
- connV11.connect(defUser, defPass);
-
- ClientStompFrame frame = connV11.createFrame("SEND");
+ conn.connect(defUser, defPass);
- frame.addHeader("destination", getQueuePrefix() + getQueueName());
- frame.setBody("Hello World");
+ ClientStompFrame frame = conn.createFrame(Stomp.Commands.SEND)
+ .addHeader(Stomp.Headers.Send.DESTINATION, getQueuePrefix() + getQueueName())
+ .setBody("Hello World");
- connV11.sendWickedFrame(frame);
+ conn.sendWickedFrame(frame);
TextMessage message = (TextMessage) consumer.receive(1000);
Assert.assertNotNull(message);
@@ -1800,24 +1675,17 @@ public class StompV11Test extends StompV11TestBase {
assertNull(consumer.receive(1000));
- connV11.disconnect();
+ conn.disconnect();
}
@Test
public void testSendMessageWithReceipt() throws Exception {
MessageConsumer consumer = session.createConsumer(queue);
- connV11.connect(defUser, defPass);
+ conn.connect(defUser, defPass);
- ClientStompFrame frame = connV11.createFrame("SEND");
- frame.addHeader("destination", getQueuePrefix() + getQueueName());
- frame.addHeader("receipt", "1234");
- frame.setBody("Hello World");
- frame = connV11.sendFrame(frame);
-
- assertTrue(frame.getCommand().equals("RECEIPT"));
- assertEquals("1234", frame.getHeader("receipt-id"));
+ send(conn, getQueuePrefix() + getQueueName(), null, "Hello World", true);
TextMessage message = (TextMessage) consumer.receive(1000);
Assert.assertNotNull(message);
@@ -1829,28 +1697,27 @@ public class StompV11Test extends StompV11TestBase {
long tmsg = message.getJMSTimestamp();
Assert.assertTrue(Math.abs(tnow - tmsg) < 1000);
- connV11.disconnect();
+ conn.disconnect();
}
@Test
public void testSendMessageWithStandardHeaders() throws Exception {
MessageConsumer consumer = session.createConsumer(queue);
- connV11.connect(defUser, defPass);
-
- ClientStompFrame frame = connV11.createFrame("SEND");
- frame.addHeader("destination", getQueuePrefix() + getQueueName());
- frame.addHeader("correlation-id", "c123");
- frame.addHeader("persistent", "true");
- frame.addHeader("priority", "3");
- frame.addHeader("type", "t345");
- frame.addHeader("JMSXGroupID", "abc");
- frame.addHeader("foo", "abc");
- frame.addHeader("bar", "123");
+ conn.connect(defUser, defPass);
- frame.setBody("Hello World");
+ ClientStompFrame frame = conn.createFrame(Stomp.Commands.SEND)
+ .addHeader(Stomp.Headers.Send.DESTINATION, getQueuePrefix() + getQueueName())
+ .addHeader(Stomp.Headers.Message.CORRELATION_ID, "c123")
+ .addHeader(Stomp.Headers.Message.PERSISTENT, "true")
+ .addHeader(Stomp.Headers.Message.PRIORITY, "3")
+ .addHeader(Stomp.Headers.Message.TYPE, "t345")
+ .addHeader("JMSXGroupID", "abc")
+ .addHeader("foo", "abc")
+ .addHeader("bar", "123")
+ .setBody("Hello World");
- frame = connV11.sendFrame(frame);
+ frame = conn.sendFrame(frame);
TextMessage message = (TextMessage) consumer.receive(1000);
Assert.assertNotNull(message);
@@ -1864,33 +1731,32 @@ public class StompV11Test extends StompV11TestBase {
Assert.assertEquals("JMSXGroupID", "abc", message.getStringProperty("JMSXGroupID"));
- connV11.disconnect();
+ conn.disconnect();
}
@Test
public void testSendMessageWithLongHeaders() throws Exception {
MessageConsumer consumer = session.createConsumer(queue);
- connV11.connect(defUser, defPass);
+ conn.connect(defUser, defPass);
StringBuffer buffer = new StringBuffer();
for (int i = 0; i < 2048; i++) {
buffer.append("a");
}
- ClientStompFrame frame = connV11.createFrame("SEND");
- frame.addHeader("destination", getQueuePrefix() + getQueueName());
- frame.addHeader("cor
<TRUNCATED>