You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by jb...@apache.org on 2016/11/11 19:23:34 UTC

[04/11] activemq-artemis git commit: Stomp refactor + track autocreation for addresses

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/84df373a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/v11/StompV11Test.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/v11/StompV11Test.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/v11/StompV11Test.java
index da69d96..e8a6f25 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/v11/StompV11Test.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/v11/StompV11Test.java
@@ -32,8 +32,10 @@ import java.util.concurrent.TimeUnit;
 
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.protocol.stomp.Stomp;
+import org.apache.activemq.artemis.core.server.impl.AddressInfo;
 import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
 import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
+import org.apache.activemq.artemis.tests.integration.stomp.StompTestBase;
 import org.apache.activemq.artemis.tests.integration.stomp.util.ClientStompFrame;
 import org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConnection;
 import org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConnectionFactory;
@@ -46,28 +48,28 @@ import org.junit.Test;
 /*
  *
  */
-public class StompV11Test extends StompV11TestBase {
+public class StompV11Test extends StompTestBase {
 
    private static final transient IntegrationTestLogger log = IntegrationTestLogger.LOGGER;
    public static final String CLIENT_ID = "myclientid";
 
-   private StompClientConnection connV11;
+   private StompClientConnection conn;
 
    @Override
    @Before
    public void setUp() throws Exception {
       super.setUp();
-      connV11 = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
+      conn = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
    }
 
    @Override
    @After
    public void tearDown() throws Exception {
       try {
-         boolean connected = connV11 != null && connV11.isConnected();
-         log.debug("Connection 11 : " + connected);
+         boolean connected = conn != null && conn.isConnected();
+         log.debug("Connection 1.1 : " + connected);
          if (connected) {
-            connV11.disconnect();
+            conn.disconnect();
          }
       } finally {
          super.tearDown();
@@ -115,276 +117,234 @@ public class StompV11Test extends StompV11TestBase {
       conn = (StompClientConnectionV11) StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
       ClientStompFrame frame = conn.connect("invaliduser", defPass);
       assertFalse(conn.isConnected());
-      assertTrue("ERROR".equals(frame.getCommand()));
+      assertTrue(Stomp.Responses.ERROR.equals(frame.getCommand()));
       assertTrue(frame.getBody().contains("Security Error occurred"));
    }
 
    @Test
    public void testNegotiation() throws Exception {
       // case 1 accept-version absent. It is a 1.0 connect
-      ClientStompFrame frame = connV11.createFrame("CONNECT");
-      frame.addHeader("host", "127.0.0.1");
-      frame.addHeader("login", this.defUser);
-      frame.addHeader("passcode", this.defPass);
+      ClientStompFrame frame = conn.createFrame(Stomp.Commands.CONNECT)
+                                   .addHeader(Stomp.Headers.Connect.HOST, "127.0.0.1")
+                                   .addHeader(Stomp.Headers.Connect.LOGIN, this.defUser)
+                                   .addHeader(Stomp.Headers.Connect.PASSCODE, this.defPass);
 
-      ClientStompFrame reply = connV11.sendFrame(frame);
+      ClientStompFrame reply = conn.sendFrame(frame);
 
-      assertEquals("CONNECTED", reply.getCommand());
+      assertEquals(Stomp.Responses.CONNECTED, reply.getCommand());
 
       //reply headers: version, session, server
       assertEquals(null, reply.getHeader("version"));
 
-      connV11.disconnect();
+      conn.disconnect();
 
       // case 2 accept-version=1.0, result: 1.0
-      connV11 = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
-      frame = connV11.createFrame("CONNECT");
-      frame.addHeader("accept-version", "1.0");
-      frame.addHeader("host", "127.0.0.1");
-      frame.addHeader("login", this.defUser);
-      frame.addHeader("passcode", this.defPass);
+      conn = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
+      frame = conn.createFrame(Stomp.Commands.CONNECT)
+                  .addHeader(Stomp.Headers.Connect.ACCEPT_VERSION, "1.0")
+                  .addHeader(Stomp.Headers.Connect.HOST, "127.0.0.1")
+                  .addHeader(Stomp.Headers.Connect.LOGIN, this.defUser)
+                  .addHeader(Stomp.Headers.Connect.PASSCODE, this.defPass);
 
-      reply = connV11.sendFrame(frame);
+      reply = conn.sendFrame(frame);
 
-      assertEquals("CONNECTED", reply.getCommand());
+      assertEquals(Stomp.Responses.CONNECTED, reply.getCommand());
 
       //reply headers: version, session, server
       assertEquals("1.0", reply.getHeader("version"));
 
-      connV11.disconnect();
+      conn.disconnect();
 
       // case 3 accept-version=1.1, result: 1.1
-      connV11 = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
-      frame = connV11.createFrame("CONNECT");
-      frame.addHeader("accept-version", "1.1");
-      frame.addHeader("host", "127.0.0.1");
-      frame.addHeader("login", this.defUser);
-      frame.addHeader("passcode", this.defPass);
+      conn = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
+      frame = conn.createFrame(Stomp.Commands.CONNECT)
+                  .addHeader(Stomp.Headers.Connect.ACCEPT_VERSION, "1.1")
+                  .addHeader(Stomp.Headers.Connect.HOST, "127.0.0.1")
+                  .addHeader(Stomp.Headers.Connect.LOGIN, this.defUser)
+                  .addHeader(Stomp.Headers.Connect.PASSCODE, this.defPass);
 
-      reply = connV11.sendFrame(frame);
+      reply = conn.sendFrame(frame);
 
-      assertEquals("CONNECTED", reply.getCommand());
+      assertEquals(Stomp.Responses.CONNECTED, reply.getCommand());
 
       //reply headers: version, session, server
       assertEquals("1.1", reply.getHeader("version"));
 
-      connV11.disconnect();
+      conn.disconnect();
 
       // case 4 accept-version=1.0,1.1,1.2, result 1.1
-      connV11 = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
-      frame = connV11.createFrame("CONNECT");
-      frame.addHeader("accept-version", "1.0,1.1,1.3");
-      frame.addHeader("host", "127.0.0.1");
-      frame.addHeader("login", this.defUser);
-      frame.addHeader("passcode", this.defPass);
+      conn = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
+      frame = conn.createFrame(Stomp.Commands.CONNECT)
+                  .addHeader(Stomp.Headers.Connect.ACCEPT_VERSION, "1.0,1.1,1.3")
+                  .addHeader(Stomp.Headers.Connect.HOST, "127.0.0.1")
+                  .addHeader(Stomp.Headers.Connect.LOGIN, this.defUser)
+                  .addHeader(Stomp.Headers.Connect.PASSCODE, this.defPass);
 
-      reply = connV11.sendFrame(frame);
+      reply = conn.sendFrame(frame);
 
-      assertEquals("CONNECTED", reply.getCommand());
+      assertEquals(Stomp.Responses.CONNECTED, reply.getCommand());
 
       //reply headers: version, session, server
       assertEquals("1.1", reply.getHeader("version"));
 
-      connV11.disconnect();
+      conn.disconnect();
 
       // case 5 accept-version=1.2, result error
-      connV11 = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
-      frame = connV11.createFrame("CONNECT");
-      frame.addHeader("accept-version", "1.3");
-      frame.addHeader("host", "127.0.0.1");
-      frame.addHeader("login", this.defUser);
-      frame.addHeader("passcode", this.defPass);
+      conn = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
+      frame = conn.createFrame(Stomp.Commands.CONNECT)
+                  .addHeader(Stomp.Headers.Connect.ACCEPT_VERSION, "1.3")
+                  .addHeader(Stomp.Headers.Connect.HOST, "127.0.0.1")
+                  .addHeader(Stomp.Headers.Connect.LOGIN, this.defUser)
+                  .addHeader(Stomp.Headers.Connect.PASSCODE, this.defPass);
 
-      reply = connV11.sendFrame(frame);
+      reply = conn.sendFrame(frame);
 
-      assertEquals("ERROR", reply.getCommand());
+      assertEquals(Stomp.Responses.ERROR, reply.getCommand());
 
-      System.out.println("Got error frame " + reply);
+      IntegrationTestLogger.LOGGER.info("Got error frame " + reply);
 
    }
 
    @Test
    public void testSendAndReceive() throws Exception {
-      connV11.connect(defUser, defPass);
-      ClientStompFrame frame = connV11.createFrame("SEND");
-      frame.addHeader("destination", getQueuePrefix() + getQueueName());
-      frame.addHeader("content-type", "text/plain");
-      frame.setBody("Hello World 1!");
+      conn.connect(defUser, defPass);
 
-      ClientStompFrame response = connV11.sendFrame(frame);
+      ClientStompFrame response = send(conn, getQueuePrefix() + getQueueName(), "text/plain", "Hello World 1!");
 
       assertNull(response);
 
-      frame.addHeader("receipt", "1234");
-      frame.setBody("Hello World 2!");
-
-      response = connV11.sendFrame(frame);
-
-      assertNotNull(response);
+      String uuid = UUID.randomUUID().toString();
 
-      assertEquals("RECEIPT", response.getCommand());
-
-      assertEquals("1234", response.getHeader("receipt-id"));
+      response = send(conn, getQueuePrefix() + getQueueName(), "text/plain", "Hello World 2!", true);
 
       //subscribe
       StompClientConnection newConn = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
       newConn.connect(defUser, defPass);
 
-      ClientStompFrame subFrame = newConn.createFrame("SUBSCRIBE");
-      subFrame.addHeader("id", "a-sub");
-      subFrame.addHeader("destination", getQueuePrefix() + getQueueName());
-      subFrame.addHeader("ack", "auto");
-
-      newConn.sendFrame(subFrame);
+      subscribe(newConn, "a-sub");
 
-      frame = newConn.receiveFrame();
+      ClientStompFrame frame = newConn.receiveFrame();
 
-      System.out.println("received " + frame);
+      IntegrationTestLogger.LOGGER.info("received " + frame);
 
-      assertEquals("MESSAGE", frame.getCommand());
+      assertEquals(Stomp.Responses.MESSAGE, frame.getCommand());
 
-      assertEquals("a-sub", frame.getHeader("subscription"));
+      assertEquals("a-sub", frame.getHeader(Stomp.Headers.Ack.SUBSCRIPTION));
 
-      assertNotNull(frame.getHeader("message-id"));
+      assertNotNull(frame.getHeader(Stomp.Headers.Message.MESSAGE_ID));
 
-      assertEquals(getQueuePrefix() + getQueueName(), frame.getHeader("destination"));
+      assertEquals(getQueuePrefix() + getQueueName(), frame.getHeader(Stomp.Headers.Message.DESTINATION));
 
       assertEquals("Hello World 1!", frame.getBody());
 
       frame = newConn.receiveFrame();
 
-      System.out.println("received " + frame);
+      IntegrationTestLogger.LOGGER.info("received " + frame);
 
-      //unsub
-      ClientStompFrame unsubFrame = newConn.createFrame("UNSUBSCRIBE");
-      unsubFrame.addHeader("id", "a-sub");
-      newConn.sendFrame(unsubFrame);
+      unsubscribe(newConn, "a-sub");
 
       newConn.disconnect();
    }
 
    @Test
    public void testHeaderContentType() throws Exception {
-      connV11.connect(defUser, defPass);
-      ClientStompFrame frame = connV11.createFrame("SEND");
-      frame.addHeader("destination", getQueuePrefix() + getQueueName());
-      frame.addHeader("content-type", "application/xml");
-      frame.setBody("Hello World 1!");
-
-      connV11.sendFrame(frame);
+      conn.connect(defUser, defPass);
+      send(conn, getQueuePrefix() + getQueueName(), "application/xml", "Hello World 1!");
 
       //subscribe
       StompClientConnection newConn = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
       newConn.connect(defUser, defPass);
 
-      ClientStompFrame subFrame = newConn.createFrame("SUBSCRIBE");
-      subFrame.addHeader("id", "a-sub");
-      subFrame.addHeader("destination", getQueuePrefix() + getQueueName());
-      subFrame.addHeader("ack", "auto");
+      subscribe(newConn, "a-sub");
 
-      newConn.sendFrame(subFrame);
+      ClientStompFrame frame = newConn.receiveFrame();
 
-      frame = newConn.receiveFrame();
-
-      System.out.println("received " + frame);
+      IntegrationTestLogger.LOGGER.info("received " + frame);
 
-      assertEquals("MESSAGE", frame.getCommand());
+      assertEquals(Stomp.Responses.MESSAGE, frame.getCommand());
 
-      assertEquals("application/xml", frame.getHeader("content-type"));
+      assertEquals("application/xml", frame.getHeader(Stomp.Headers.CONTENT_TYPE));
 
       //unsub
-      ClientStompFrame unsubFrame = newConn.createFrame("UNSUBSCRIBE");
-      unsubFrame.addHeader("id", "a-sub");
+      unsubscribe(newConn, "a-sub");
 
       newConn.disconnect();
    }
 
    @Test
    public void testHeaderContentLength() throws Exception {
-      connV11.connect(defUser, defPass);
-      ClientStompFrame frame = connV11.createFrame("SEND");
+      conn.connect(defUser, defPass);
 
       String body = "Hello World 1!";
       String cLen = String.valueOf(body.getBytes(StandardCharsets.UTF_8).length);
 
-      frame.addHeader("destination", getQueuePrefix() + getQueueName());
-      frame.addHeader("content-type", "application/xml");
-      frame.addHeader("content-length", cLen);
-      frame.setBody(body + "extra");
+      ClientStompFrame frame = conn.createFrame(Stomp.Commands.SEND)
+                                   .addHeader(Stomp.Headers.Send.DESTINATION, getQueuePrefix() + getQueueName())
+                                   .addHeader(Stomp.Headers.CONTENT_TYPE, "application/xml")
+                                   .addHeader(Stomp.Headers.CONTENT_LENGTH, cLen)
+                                   .setBody(body + "extra");
 
-      connV11.sendFrame(frame);
+      conn.sendFrame(frame);
 
       //subscribe
       StompClientConnection newConn = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
       newConn.connect(defUser, defPass);
 
-      ClientStompFrame subFrame = newConn.createFrame("SUBSCRIBE");
-      subFrame.addHeader("id", "a-sub");
-      subFrame.addHeader("destination", getQueuePrefix() + getQueueName());
-      subFrame.addHeader("ack", "auto");
-
-      newConn.sendFrame(subFrame);
+      subscribe(newConn, "a-sub");
 
       frame = newConn.receiveFrame();
 
-      System.out.println("received " + frame);
+      IntegrationTestLogger.LOGGER.info("received " + frame);
 
-      assertEquals("MESSAGE", frame.getCommand());
+      assertEquals(Stomp.Responses.MESSAGE, frame.getCommand());
 
-      assertEquals(cLen, frame.getHeader("content-length"));
+      assertEquals(cLen, frame.getHeader(Stomp.Headers.CONTENT_LENGTH));
 
       //unsub
-      ClientStompFrame unsubFrame = newConn.createFrame("UNSUBSCRIBE");
-      unsubFrame.addHeader("id", "a-sub");
+      unsubscribe(newConn, "a-sub");
 
       newConn.disconnect();
    }
 
    @Test
    public void testHeaderEncoding() throws Exception {
-      connV11.connect(defUser, defPass);
-      ClientStompFrame frame = connV11.createFrame("SEND");
+      conn.connect(defUser, defPass);
 
       String body = "Hello World 1!";
       String cLen = String.valueOf(body.getBytes(StandardCharsets.UTF_8).length);
-
-      frame.addHeader("destination", getQueuePrefix() + getQueueName());
-      frame.addHeader("content-type", "application/xml");
-      frame.addHeader("content-length", cLen);
       String hKey = "special-header\\\\\\n\\c";
       String hVal = "\\c\\\\\\ngood";
-      frame.addHeader(hKey, hVal);
 
-      System.out.println("key: |" + hKey + "| val: |" + hVal + "|");
+      ClientStompFrame frame = conn.createFrame(Stomp.Commands.SEND)
+                                   .addHeader(Stomp.Headers.Send.DESTINATION, getQueuePrefix() + getQueueName())
+                                   .addHeader(Stomp.Headers.CONTENT_TYPE, "application/xml")
+                                   .addHeader(Stomp.Headers.CONTENT_LENGTH, cLen)
+                                   .addHeader(hKey, hVal);
+
+      IntegrationTestLogger.LOGGER.info("key: |" + hKey + "| val: |" + hVal + "|");
 
       frame.setBody(body);
 
-      connV11.sendFrame(frame);
+      conn.sendFrame(frame);
 
       //subscribe
       StompClientConnection newConn = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
       newConn.connect(defUser, defPass);
 
-      ClientStompFrame subFrame = newConn.createFrame("SUBSCRIBE");
-      subFrame.addHeader("id", "a-sub");
-      subFrame.addHeader("destination", getQueuePrefix() + getQueueName());
-      subFrame.addHeader("ack", "auto");
-
-      newConn.sendFrame(subFrame);
+      subscribe(newConn, "a-sub");
 
       frame = newConn.receiveFrame();
 
-      System.out.println("received " + frame);
+      IntegrationTestLogger.LOGGER.info("received " + frame);
 
-      assertEquals("MESSAGE", frame.getCommand());
+      assertEquals(Stomp.Responses.MESSAGE, frame.getCommand());
 
       String value = frame.getHeader("special-header" + "\\" + "\n" + ":");
 
       assertEquals(":" + "\\" + "\n" + "good", value);
 
       //unsub
-      ClientStompFrame unsubFrame = newConn.createFrame("UNSUBSCRIBE");
-      unsubFrame.addHeader("id", "a-sub");
+      unsubscribe(newConn, "a-sub");
 
       newConn.disconnect();
    }
@@ -392,106 +352,96 @@ public class StompV11Test extends StompV11TestBase {
    @Test
    public void testHeartBeat() throws Exception {
       //no heart beat at all if heat-beat absent
-      ClientStompFrame frame = connV11.createFrame("CONNECT");
-      frame.addHeader("host", "127.0.0.1");
-      frame.addHeader("login", this.defUser);
-      frame.addHeader("passcode", this.defPass);
+      ClientStompFrame frame = conn.createFrame(Stomp.Commands.CONNECT)
+                                   .addHeader(Stomp.Headers.Connect.HOST, "127.0.0.1")
+                                   .addHeader(Stomp.Headers.Connect.LOGIN, this.defUser)
+                                   .addHeader(Stomp.Headers.Connect.PASSCODE, this.defPass);
 
-      ClientStompFrame reply = connV11.sendFrame(frame);
+      ClientStompFrame reply = conn.sendFrame(frame);
 
-      assertEquals("CONNECTED", reply.getCommand());
+      assertEquals(Stomp.Responses.CONNECTED, reply.getCommand());
 
       Thread.sleep(5000);
 
-      assertEquals(0, connV11.getFrameQueueSize());
+      assertEquals(0, conn.getFrameQueueSize());
 
-      connV11.disconnect();
+      conn.disconnect();
 
       //default heart beat for (0,0) which is default connection TTL (60000) / default heartBeatToTtlModifier (2.0) = 30000
-      connV11 = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
-      frame = connV11.createFrame("CONNECT");
-      frame.addHeader("host", "127.0.0.1");
-      frame.addHeader("login", this.defUser);
-      frame.addHeader("passcode", this.defPass);
-      frame.addHeader("heart-beat", "0,0");
-      frame.addHeader("accept-version", "1.0,1.1");
+      conn = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
+      frame = conn.createFrame(Stomp.Commands.CONNECT)
+                  .addHeader(Stomp.Headers.Connect.HOST, "127.0.0.1")
+                  .addHeader(Stomp.Headers.Connect.LOGIN, this.defUser)
+                  .addHeader(Stomp.Headers.Connect.PASSCODE, this.defPass)
+                  .addHeader(Stomp.Headers.Connect.HEART_BEAT, "0,0")
+                  .addHeader(Stomp.Headers.Connect.ACCEPT_VERSION, "1.0,1.1");
 
-      reply = connV11.sendFrame(frame);
+      reply = conn.sendFrame(frame);
 
-      assertEquals("CONNECTED", reply.getCommand());
+      assertEquals(Stomp.Responses.CONNECTED, reply.getCommand());
 
-      assertEquals("0,30000", reply.getHeader("heart-beat"));
+      assertEquals("0,30000", reply.getHeader(Stomp.Headers.Connect.HEART_BEAT));
 
       Thread.sleep(5000);
 
-      assertEquals(0, connV11.getFrameQueueSize());
+      assertEquals(0, conn.getFrameQueueSize());
 
-      connV11.disconnect();
+      conn.disconnect();
 
       //heart-beat (1,0), should receive a min client ping accepted by server
-      connV11 = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
-      frame = connV11.createFrame("CONNECT");
-      frame.addHeader("host", "127.0.0.1");
-      frame.addHeader("login", this.defUser);
-      frame.addHeader("passcode", this.defPass);
-      frame.addHeader("heart-beat", "1,0");
-      frame.addHeader("accept-version", "1.0,1.1");
+      conn = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
+      frame = conn.createFrame(Stomp.Commands.CONNECT)
+                  .addHeader(Stomp.Headers.Connect.HOST, "127.0.0.1")
+                  .addHeader(Stomp.Headers.Connect.LOGIN, this.defUser)
+                  .addHeader(Stomp.Headers.Connect.PASSCODE, this.defPass)
+                  .addHeader(Stomp.Headers.Connect.HEART_BEAT, "1,0")
+                  .addHeader(Stomp.Headers.Connect.ACCEPT_VERSION, "1.0,1.1");
 
-      reply = connV11.sendFrame(frame);
+      reply = conn.sendFrame(frame);
 
-      assertEquals("CONNECTED", reply.getCommand());
+      assertEquals(Stomp.Responses.CONNECTED, reply.getCommand());
 
-      assertEquals("0,500", reply.getHeader("heart-beat"));
+      assertEquals("0,500", reply.getHeader(Stomp.Headers.Connect.HEART_BEAT));
 
       Thread.sleep(2000);
 
       //now server side should be disconnected because we didn't send ping for 2 sec
-      frame = connV11.createFrame("SEND");
-      frame.addHeader("destination", getQueuePrefix() + getQueueName());
-      frame.addHeader("content-type", "text/plain");
-      frame.setBody("Hello World");
-
       //send will fail
       try {
-         connV11.sendFrame(frame);
+         send(conn, getQueuePrefix() + getQueueName(), "text/plain", "Hello World");
          fail("connection should have been destroyed by now");
       } catch (IOException e) {
          //ignore
       }
 
       //heart-beat (1,0), start a ping, then send a message, should be ok.
-      connV11 = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
-      frame = connV11.createFrame("CONNECT");
-      frame.addHeader("host", "127.0.0.1");
-      frame.addHeader("login", this.defUser);
-      frame.addHeader("passcode", this.defPass);
-      frame.addHeader("heart-beat", "1,0");
-      frame.addHeader("accept-version", "1.0,1.1");
+      conn = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
+      frame = conn.createFrame(Stomp.Commands.CONNECT)
+                  .addHeader(Stomp.Headers.Connect.HOST, "127.0.0.1")
+                  .addHeader(Stomp.Headers.Connect.LOGIN, this.defUser)
+                  .addHeader(Stomp.Headers.Connect.PASSCODE, this.defPass)
+                  .addHeader(Stomp.Headers.Connect.HEART_BEAT, "1,0")
+                  .addHeader(Stomp.Headers.Connect.ACCEPT_VERSION, "1.0,1.1");
 
-      reply = connV11.sendFrame(frame);
+      reply = conn.sendFrame(frame);
 
-      assertEquals("CONNECTED", reply.getCommand());
+      assertEquals(Stomp.Responses.CONNECTED, reply.getCommand());
 
-      assertEquals("0,500", reply.getHeader("heart-beat"));
+      assertEquals("0,500", reply.getHeader(Stomp.Headers.Connect.HEART_BEAT));
 
-      System.out.println("========== start pinger!");
+      IntegrationTestLogger.LOGGER.info("========== start pinger!");
 
-      connV11.startPinger(500);
+      conn.startPinger(500);
 
       Thread.sleep(2000);
 
       //now server side should be disconnected because we didn't send ping for 2 sec
-      frame = connV11.createFrame("SEND");
-      frame.addHeader("destination", getQueuePrefix() + getQueueName());
-      frame.addHeader("content-type", "text/plain");
-      frame.setBody("Hello World");
-
       //send will be ok
-      connV11.sendFrame(frame);
+      send(conn, getQueuePrefix() + getQueueName(), "text/plain", "Hello World");
 
-      connV11.stopPinger();
+      conn.stopPinger();
 
-      connV11.disconnect();
+      conn.disconnect();
 
    }
 
@@ -499,82 +449,72 @@ public class StompV11Test extends StompV11TestBase {
    @Test
    public void testHeartBeat2() throws Exception {
       //heart-beat (1,1)
-      ClientStompFrame frame = connV11.createFrame("CONNECT");
-      frame.addHeader("host", "127.0.0.1");
-      frame.addHeader("login", this.defUser);
-      frame.addHeader("passcode", this.defPass);
-      frame.addHeader("heart-beat", "1,1");
-      frame.addHeader("accept-version", "1.0,1.1");
+      ClientStompFrame frame = conn.createFrame(Stomp.Commands.CONNECT)
+                                   .addHeader(Stomp.Headers.Connect.HOST, "127.0.0.1")
+                                   .addHeader(Stomp.Headers.Connect.LOGIN, this.defUser)
+                                   .addHeader(Stomp.Headers.Connect.PASSCODE, this.defPass)
+                                   .addHeader(Stomp.Headers.Connect.HEART_BEAT, "1,1")
+                                   .addHeader(Stomp.Headers.Connect.ACCEPT_VERSION, "1.0,1.1");
 
-      ClientStompFrame reply = connV11.sendFrame(frame);
+      ClientStompFrame reply = conn.sendFrame(frame);
 
-      assertEquals("CONNECTED", reply.getCommand());
-      assertEquals("500,500", reply.getHeader("heart-beat"));
+      assertEquals(Stomp.Responses.CONNECTED, reply.getCommand());
+      assertEquals("500,500", reply.getHeader(Stomp.Headers.Connect.HEART_BEAT));
 
-      connV11.disconnect();
+      conn.disconnect();
 
       //heart-beat (500,1000)
-      connV11 = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
-      frame = connV11.createFrame("CONNECT");
-      frame.addHeader("host", "127.0.0.1");
-      frame.addHeader("login", this.defUser);
-      frame.addHeader("passcode", this.defPass);
-      frame.addHeader("heart-beat", "500,1000");
-      frame.addHeader("accept-version", "1.0,1.1");
+      conn = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
+      frame = conn.createFrame(Stomp.Commands.CONNECT)
+                  .addHeader(Stomp.Headers.Connect.HOST, "127.0.0.1")
+                  .addHeader(Stomp.Headers.Connect.LOGIN, this.defUser)
+                  .addHeader(Stomp.Headers.Connect.PASSCODE, this.defPass)
+                  .addHeader(Stomp.Headers.Connect.HEART_BEAT, "500,1000")
+                  .addHeader(Stomp.Headers.Connect.ACCEPT_VERSION, "1.0,1.1");
 
-      reply = connV11.sendFrame(frame);
+      reply = conn.sendFrame(frame);
 
-      assertEquals("CONNECTED", reply.getCommand());
+      assertEquals(Stomp.Responses.CONNECTED, reply.getCommand());
 
-      assertEquals("1000,500", reply.getHeader("heart-beat"));
+      assertEquals("1000,500", reply.getHeader(Stomp.Headers.Connect.HEART_BEAT));
 
-      System.out.println("========== start pinger!");
+      IntegrationTestLogger.LOGGER.info("========== start pinger!");
 
-      connV11.startPinger(500);
+      conn.startPinger(500);
 
       Thread.sleep(10000);
 
       //now check the frame size
-      int size = connV11.getServerPingNumber();
+      int size = conn.getServerPingNumber();
 
-      System.out.println("ping received: " + size);
+      IntegrationTestLogger.LOGGER.info("ping received: " + size);
 
       assertTrue(size > 5);
 
       //now server side should be disconnected because we didn't send ping for 2 sec
-      frame = connV11.createFrame("SEND");
-      frame.addHeader("destination", getQueuePrefix() + getQueueName());
-      frame.addHeader("content-type", "text/plain");
-      frame.setBody("Hello World");
-
       //send will be ok
-      connV11.sendFrame(frame);
+      send(conn, getQueuePrefix() + getQueueName(), "text/plain", "Hello World");
 
-      connV11.disconnect();
+      conn.disconnect();
    }
 
    @Test
    public void testSendWithHeartBeatsAndReceive() throws Exception {
       StompClientConnection newConn = null;
       try {
-         ClientStompFrame frame = connV11.createFrame("CONNECT");
-         frame.addHeader("host", "127.0.0.1");
-         frame.addHeader("login", this.defUser);
-         frame.addHeader("passcode", this.defPass);
-         frame.addHeader("heart-beat", "500,1000");
-         frame.addHeader("accept-version", "1.0,1.1");
+         ClientStompFrame frame = conn.createFrame(Stomp.Commands.CONNECT)
+                                      .addHeader(Stomp.Headers.Connect.HOST, "127.0.0.1")
+                                      .addHeader(Stomp.Headers.Connect.LOGIN, this.defUser)
+                                      .addHeader(Stomp.Headers.Connect.PASSCODE, this.defPass)
+                                      .addHeader(Stomp.Headers.Connect.HEART_BEAT, "500,1000")
+                                      .addHeader(Stomp.Headers.Connect.ACCEPT_VERSION, "1.0,1.1");
 
-         connV11.sendFrame(frame);
+         conn.sendFrame(frame);
 
-         connV11.startPinger(500);
-
-         frame = connV11.createFrame("SEND");
-         frame.addHeader("destination", getQueuePrefix() + getQueueName());
-         frame.addHeader("content-type", "text/plain");
+         conn.startPinger(500);
 
          for (int i = 0; i < 10; i++) {
-            frame.setBody("Hello World " + i + "!");
-            connV11.sendFrame(frame);
+            send(conn, getQueuePrefix() + getQueueName(), "text/plain", "Hello World " + i + "!");
             Thread.sleep(500);
          }
 
@@ -582,12 +522,7 @@ public class StompV11Test extends StompV11TestBase {
          newConn = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
          newConn.connect(defUser, defPass);
 
-         ClientStompFrame subFrame = newConn.createFrame("SUBSCRIBE");
-         subFrame.addHeader("id", "a-sub");
-         subFrame.addHeader("destination", getQueuePrefix() + getQueueName());
-         subFrame.addHeader("ack", "auto");
-
-         newConn.sendFrame(subFrame);
+         subscribe(newConn, "a-sub");
 
          int cnt = 0;
 
@@ -602,38 +537,32 @@ public class StompV11Test extends StompV11TestBase {
          assertEquals(10, cnt);
 
          // unsub
-         ClientStompFrame unsubFrame = newConn.createFrame("UNSUBSCRIBE");
-         unsubFrame.addHeader("id", "a-sub");
-         newConn.sendFrame(unsubFrame);
+         unsubscribe(newConn, "a-sub");
       } finally {
          if (newConn != null)
             newConn.disconnect();
-         connV11.disconnect();
+         conn.disconnect();
       }
    }
 
    @Test
    public void testSendAndReceiveWithHeartBeats() throws Exception {
-      connV11.connect(defUser, defPass);
-      ClientStompFrame frame = connV11.createFrame("SEND");
-      frame.addHeader("destination", getQueuePrefix() + getQueueName());
-      frame.addHeader("content-type", "text/plain");
+      conn.connect(defUser, defPass);
 
       for (int i = 0; i < 10; i++) {
-         frame.setBody("Hello World " + i + "!");
-         connV11.sendFrame(frame);
+         send(conn, getQueuePrefix() + getQueueName(), "text/plain", "Hello World " + i + "!");
          Thread.sleep(500);
       }
 
       //subscribe
       StompClientConnection newConn = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
       try {
-         frame = newConn.createFrame("CONNECT");
-         frame.addHeader("host", "127.0.0.1");
-         frame.addHeader("login", this.defUser);
-         frame.addHeader("passcode", this.defPass);
-         frame.addHeader("heart-beat", "500,1000");
-         frame.addHeader("accept-version", "1.0,1.1");
+         ClientStompFrame frame = newConn.createFrame(Stomp.Commands.CONNECT)
+                        .addHeader(Stomp.Headers.Connect.HOST, "127.0.0.1")
+                        .addHeader(Stomp.Headers.Connect.LOGIN, this.defUser)
+                        .addHeader(Stomp.Headers.Connect.PASSCODE, this.defPass)
+                        .addHeader(Stomp.Headers.Connect.HEART_BEAT, "500,1000")
+                        .addHeader(Stomp.Headers.Connect.ACCEPT_VERSION, "1.0,1.1");
 
          newConn.sendFrame(frame);
 
@@ -641,12 +570,7 @@ public class StompV11Test extends StompV11TestBase {
 
          Thread.sleep(500);
 
-         ClientStompFrame subFrame = newConn.createFrame("SUBSCRIBE");
-         subFrame.addHeader("id", "a-sub");
-         subFrame.addHeader("destination", getQueuePrefix() + getQueueName());
-         subFrame.addHeader("ack", "auto");
-
-         newConn.sendFrame(subFrame);
+         subscribe(newConn, "a-sub");
 
          int cnt = 0;
 
@@ -661,9 +585,7 @@ public class StompV11Test extends StompV11TestBase {
          assertEquals(10, cnt);
 
          // unsub
-         ClientStompFrame unsubFrame = newConn.createFrame("UNSUBSCRIBE");
-         unsubFrame.addHeader("id", "a-sub");
-         newConn.sendFrame(unsubFrame);
+         unsubscribe(newConn, "a-sub");
       } finally {
          newConn.disconnect();
       }
@@ -673,35 +595,30 @@ public class StompV11Test extends StompV11TestBase {
    public void testSendWithHeartBeatsAndReceiveWithHeartBeats() throws Exception {
       StompClientConnection newConn = null;
       try {
-         ClientStompFrame frame = connV11.createFrame("CONNECT");
-         frame.addHeader("host", "127.0.0.1");
-         frame.addHeader("login", this.defUser);
-         frame.addHeader("passcode", this.defPass);
-         frame.addHeader("heart-beat", "500,1000");
-         frame.addHeader("accept-version", "1.0,1.1");
+         ClientStompFrame frame = conn.createFrame(Stomp.Commands.CONNECT)
+                                      .addHeader(Stomp.Headers.Connect.HOST, "127.0.0.1")
+                                      .addHeader(Stomp.Headers.Connect.LOGIN, this.defUser)
+                                      .addHeader(Stomp.Headers.Connect.PASSCODE, this.defPass)
+                                      .addHeader(Stomp.Headers.Connect.HEART_BEAT, "500,1000")
+                                      .addHeader(Stomp.Headers.Connect.ACCEPT_VERSION, "1.0,1.1");
 
-         connV11.sendFrame(frame);
+         conn.sendFrame(frame);
 
-         connV11.startPinger(500);
-
-         frame = connV11.createFrame("SEND");
-         frame.addHeader("destination", getQueuePrefix() + getQueueName());
-         frame.addHeader("content-type", "text/plain");
+         conn.startPinger(500);
 
          for (int i = 0; i < 10; i++) {
-            frame.setBody("Hello World " + i + "!");
-            connV11.sendFrame(frame);
+            send(conn, getQueuePrefix() + getQueueName(), "text/plain", "Hello World " + i + "!");
             Thread.sleep(500);
          }
 
          // subscribe
          newConn = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
-         frame = newConn.createFrame("CONNECT");
-         frame.addHeader("host", "127.0.0.1");
-         frame.addHeader("login", this.defUser);
-         frame.addHeader("passcode", this.defPass);
-         frame.addHeader("heart-beat", "500,1000");
-         frame.addHeader("accept-version", "1.0,1.1");
+         frame = newConn.createFrame(Stomp.Commands.CONNECT)
+                        .addHeader(Stomp.Headers.Connect.HOST, "127.0.0.1")
+                        .addHeader(Stomp.Headers.Connect.LOGIN, this.defUser)
+                        .addHeader(Stomp.Headers.Connect.PASSCODE, this.defPass)
+                        .addHeader(Stomp.Headers.Connect.HEART_BEAT, "500,1000")
+                        .addHeader(Stomp.Headers.Connect.ACCEPT_VERSION, "1.0,1.1");
 
          newConn.sendFrame(frame);
 
@@ -709,12 +626,7 @@ public class StompV11Test extends StompV11TestBase {
 
          Thread.sleep(500);
 
-         ClientStompFrame subFrame = newConn.createFrame("SUBSCRIBE");
-         subFrame.addHeader("id", "a-sub");
-         subFrame.addHeader("destination", getQueuePrefix() + getQueueName());
-         subFrame.addHeader("ack", "auto");
-
-         newConn.sendFrame(subFrame);
+         subscribe(newConn, "a-sub");
 
          int cnt = 0;
 
@@ -728,13 +640,11 @@ public class StompV11Test extends StompV11TestBase {
          assertEquals(10, cnt);
 
          // unsub
-         ClientStompFrame unsubFrame = newConn.createFrame("UNSUBSCRIBE");
-         unsubFrame.addHeader("id", "a-sub");
-         newConn.sendFrame(unsubFrame);
+         unsubscribe(newConn, "a-sub");
       } finally {
          if (newConn != null)
             newConn.disconnect();
-         connV11.disconnect();
+         conn.disconnect();
       }
    }
 
@@ -748,14 +658,14 @@ public class StompV11Test extends StompV11TestBase {
       StompClientConnection connection = StompClientConnectionFactory.createClientConnection("1.1", "localhost", port);
 
       //no heart beat at all if heat-beat absent
-      frame = connection.createFrame("CONNECT");
-      frame.addHeader("host", "127.0.0.1");
-      frame.addHeader("login", this.defUser);
-      frame.addHeader("passcode", this.defPass);
+      frame = connection.createFrame(Stomp.Commands.CONNECT)
+                        .addHeader(Stomp.Headers.Connect.HOST, "127.0.0.1")
+                        .addHeader(Stomp.Headers.Connect.LOGIN, this.defUser)
+                        .addHeader(Stomp.Headers.Connect.PASSCODE, this.defPass);
 
       reply = connection.sendFrame(frame);
 
-      assertEquals("CONNECTED", reply.getCommand());
+      assertEquals(Stomp.Responses.CONNECTED, reply.getCommand());
 
       Thread.sleep(3000);
 
@@ -772,20 +682,20 @@ public class StompV11Test extends StompV11TestBase {
 
       //no heart beat for (0,0)
       connection = StompClientConnectionFactory.createClientConnection("1.1", "localhost", port);
-      frame = connection.createFrame("CONNECT");
-      frame.addHeader("host", "127.0.0.1");
-      frame.addHeader("login", this.defUser);
-      frame.addHeader("passcode", this.defPass);
-      frame.addHeader("heart-beat", "0,0");
-      frame.addHeader("accept-version", "1.0,1.1");
+      frame = connection.createFrame(Stomp.Commands.CONNECT)
+                        .addHeader(Stomp.Headers.Connect.HOST, "127.0.0.1")
+                        .addHeader(Stomp.Headers.Connect.LOGIN, this.defUser)
+                        .addHeader(Stomp.Headers.Connect.PASSCODE, this.defPass)
+                        .addHeader(Stomp.Headers.Connect.HEART_BEAT, "0,0")
+                        .addHeader(Stomp.Headers.Connect.ACCEPT_VERSION, "1.0,1.1");
 
       reply = connection.sendFrame(frame);
 
       IntegrationTestLogger.LOGGER.info("Reply: " + reply);
 
-      assertEquals("CONNECTED", reply.getCommand());
+      assertEquals(Stomp.Responses.CONNECTED, reply.getCommand());
 
-      assertEquals("0,500", reply.getHeader("heart-beat"));
+      assertEquals("0,500", reply.getHeader(Stomp.Headers.Connect.HEART_BEAT));
 
       Thread.sleep(3000);
 
@@ -802,30 +712,25 @@ public class StompV11Test extends StompV11TestBase {
 
       //heart-beat (1,0), should receive a min client ping accepted by server
       connection = StompClientConnectionFactory.createClientConnection("1.1", "localhost", port);
-      frame = connection.createFrame("CONNECT");
-      frame.addHeader("host", "127.0.0.1");
-      frame.addHeader("login", this.defUser);
-      frame.addHeader("passcode", this.defPass);
-      frame.addHeader("heart-beat", "1,0");
-      frame.addHeader("accept-version", "1.0,1.1");
+      frame = connection.createFrame(Stomp.Commands.CONNECT)
+                        .addHeader(Stomp.Headers.Connect.HOST, "127.0.0.1")
+                        .addHeader(Stomp.Headers.Connect.LOGIN, this.defUser)
+                        .addHeader(Stomp.Headers.Connect.PASSCODE, this.defPass)
+                        .addHeader(Stomp.Headers.Connect.HEART_BEAT, "1,0")
+                        .addHeader(Stomp.Headers.Connect.ACCEPT_VERSION, "1.0,1.1");
 
       reply = connection.sendFrame(frame);
 
-      assertEquals("CONNECTED", reply.getCommand());
+      assertEquals(Stomp.Responses.CONNECTED, reply.getCommand());
 
-      assertEquals("0,2500", reply.getHeader("heart-beat"));
+      assertEquals("0,2500", reply.getHeader(Stomp.Headers.Connect.HEART_BEAT));
 
       Thread.sleep(7000);
 
       //now server side should be disconnected because we didn't send ping for 2 sec
-      frame = connection.createFrame("SEND");
-      frame.addHeader("destination", getQueuePrefix() + getQueueName());
-      frame.addHeader("content-type", "text/plain");
-      frame.setBody("Hello World");
-
       //send will fail
       try {
-         connection.sendFrame(frame);
+         send(connection, getQueuePrefix() + getQueueName(), "text/plain", "Hello World");
          fail("connection should have been destroyed by now");
       } catch (IOException e) {
          //ignore
@@ -833,33 +738,28 @@ public class StompV11Test extends StompV11TestBase {
 
       //heart-beat (1,0), start a ping, then send a message, should be ok.
       connection = StompClientConnectionFactory.createClientConnection("1.1", "localhost", port);
-      frame = connection.createFrame("CONNECT");
-      frame.addHeader("host", "127.0.0.1");
-      frame.addHeader("login", this.defUser);
-      frame.addHeader("passcode", this.defPass);
-      frame.addHeader("heart-beat", "1,0");
-      frame.addHeader("accept-version", "1.0,1.1");
+      frame = connection.createFrame(Stomp.Commands.CONNECT)
+                        .addHeader(Stomp.Headers.Connect.HOST, "127.0.0.1")
+                        .addHeader(Stomp.Headers.Connect.LOGIN, this.defUser)
+                        .addHeader(Stomp.Headers.Connect.PASSCODE, this.defPass)
+                        .addHeader(Stomp.Headers.Connect.HEART_BEAT, "1,0")
+                        .addHeader(Stomp.Headers.Connect.ACCEPT_VERSION, "1.0,1.1");
 
       reply = connection.sendFrame(frame);
 
-      assertEquals("CONNECTED", reply.getCommand());
+      assertEquals(Stomp.Responses.CONNECTED, reply.getCommand());
 
-      assertEquals("0,2500", reply.getHeader("heart-beat"));
+      assertEquals("0,2500", reply.getHeader(Stomp.Headers.Connect.HEART_BEAT));
 
-      System.out.println("========== start pinger!");
+      IntegrationTestLogger.LOGGER.info("========== start pinger!");
 
       connection.startPinger(2500);
 
       Thread.sleep(7000);
 
       //now server side should be disconnected because we didn't send ping for 2 sec
-      frame = connection.createFrame("SEND");
-      frame.addHeader("destination", getQueuePrefix() + getQueueName());
-      frame.addHeader("content-type", "text/plain");
-      frame.setBody("Hello World");
-
       //send will be ok
-      connection.sendFrame(frame);
+      send(connection, getQueuePrefix() + getQueueName(), "text/plain", "Hello World");
 
       connection.stopPinger();
 
@@ -867,30 +767,25 @@ public class StompV11Test extends StompV11TestBase {
 
       //heart-beat (20000,0), should receive a max client ping accepted by server
       connection = StompClientConnectionFactory.createClientConnection("1.1", "localhost", port);
-      frame = connection.createFrame("CONNECT");
-      frame.addHeader("host", "127.0.0.1");
-      frame.addHeader("login", this.defUser);
-      frame.addHeader("passcode", this.defPass);
-      frame.addHeader("heart-beat", "20000,0");
-      frame.addHeader("accept-version", "1.0,1.1");
+      frame = connection.createFrame(Stomp.Commands.CONNECT)
+                        .addHeader(Stomp.Headers.Connect.HOST, "127.0.0.1")
+                        .addHeader(Stomp.Headers.Connect.LOGIN, this.defUser)
+                        .addHeader(Stomp.Headers.Connect.PASSCODE, this.defPass)
+                        .addHeader(Stomp.Headers.Connect.HEART_BEAT, "20000,0")
+                        .addHeader(Stomp.Headers.Connect.ACCEPT_VERSION, "1.0,1.1");
 
       reply = connection.sendFrame(frame);
 
-      assertEquals("CONNECTED", reply.getCommand());
+      assertEquals(Stomp.Responses.CONNECTED, reply.getCommand());
 
-      assertEquals("0,5000", reply.getHeader("heart-beat"));
+      assertEquals("0,5000", reply.getHeader(Stomp.Headers.Connect.HEART_BEAT));
 
       Thread.sleep(12000);
 
       //now server side should be disconnected because we didn't send ping for 2 sec
-      frame = connection.createFrame("SEND");
-      frame.addHeader("destination", getQueuePrefix() + getQueueName());
-      frame.addHeader("content-type", "text/plain");
-      frame.setBody("Hello World");
-
       //send will fail
       try {
-         connection.sendFrame(frame);
+         send(connection, getQueuePrefix() + getQueueName(), "text/plain", "Hello World");
          fail("connection should have been destroyed by now");
       } catch (IOException e) {
          //ignore
@@ -907,18 +802,18 @@ public class StompV11Test extends StompV11TestBase {
       server.getActiveMQServer().getRemotingService().createAcceptor("test", "tcp://127.0.0.1:" + port + "?heartBeatToConnectionTtlModifier=1").start();
 
       connection = StompClientConnectionFactory.createClientConnection("1.1", "localhost", port);
-      frame = connection.createFrame("CONNECT");
-      frame.addHeader("host", "127.0.0.1");
-      frame.addHeader("login", this.defUser);
-      frame.addHeader("passcode", this.defPass);
-      frame.addHeader("heart-beat", "5000,0");
-      frame.addHeader("accept-version", "1.0,1.1");
+      frame = connection.createFrame(Stomp.Commands.CONNECT)
+                        .addHeader(Stomp.Headers.Connect.HOST, "127.0.0.1")
+                        .addHeader(Stomp.Headers.Connect.LOGIN, this.defUser)
+                        .addHeader(Stomp.Headers.Connect.PASSCODE, this.defPass)
+                        .addHeader(Stomp.Headers.Connect.HEART_BEAT, "5000,0")
+                        .addHeader(Stomp.Headers.Connect.ACCEPT_VERSION, "1.0,1.1");
 
       reply = connection.sendFrame(frame);
 
-      assertEquals("CONNECTED", reply.getCommand());
+      assertEquals(Stomp.Responses.CONNECTED, reply.getCommand());
 
-      assertEquals("0,5000", reply.getHeader("heart-beat"));
+      assertEquals("0,5000", reply.getHeader(Stomp.Headers.Connect.HEART_BEAT));
 
       Thread.sleep(6000);
 
@@ -933,18 +828,18 @@ public class StompV11Test extends StompV11TestBase {
       server.getActiveMQServer().getRemotingService().createAcceptor("test", "tcp://127.0.0.1:" + port + "?heartBeatToConnectionTtlModifier=1.5").start();
 
       connection = StompClientConnectionFactory.createClientConnection("1.1", "localhost", port);
-      frame = connection.createFrame("CONNECT");
-      frame.addHeader("host", "127.0.0.1");
-      frame.addHeader("login", this.defUser);
-      frame.addHeader("passcode", this.defPass);
-      frame.addHeader("heart-beat", "5000,0");
-      frame.addHeader("accept-version", "1.0,1.1");
+      frame = connection.createFrame(Stomp.Commands.CONNECT)
+                        .addHeader(Stomp.Headers.Connect.HOST, "127.0.0.1")
+                        .addHeader(Stomp.Headers.Connect.LOGIN, this.defUser)
+                        .addHeader(Stomp.Headers.Connect.PASSCODE, this.defPass)
+                        .addHeader(Stomp.Headers.Connect.HEART_BEAT, "5000,0")
+                        .addHeader(Stomp.Headers.Connect.ACCEPT_VERSION, "1.0,1.1");
 
       reply = connection.sendFrame(frame);
 
-      assertEquals("CONNECTED", reply.getCommand());
+      assertEquals(Stomp.Responses.CONNECTED, reply.getCommand());
 
-      assertEquals("0,5000", reply.getHeader("heart-beat"));
+      assertEquals("0,5000", reply.getHeader(Stomp.Headers.Connect.HEART_BEAT));
 
       Thread.sleep(6000);
 
@@ -953,21 +848,21 @@ public class StompV11Test extends StompV11TestBase {
 
    @Test
    public void testNack() throws Exception {
-      connV11.connect(defUser, defPass);
+      conn.connect(defUser, defPass);
 
-      subscribe(connV11, "sub1", "client");
+      subscribe(conn, "sub1", Stomp.Headers.Subscribe.AckModeValues.CLIENT);
 
-      sendMessage(getName());
+      sendJmsMessage(getName());
 
-      ClientStompFrame frame = connV11.receiveFrame();
+      ClientStompFrame frame = conn.receiveFrame();
 
-      String messageID = frame.getHeader("message-id");
+      String messageID = frame.getHeader(Stomp.Headers.Message.MESSAGE_ID);
 
-      nack(connV11, "sub1", messageID);
+      nack(conn, "sub1", messageID);
 
-      unsubscribe(connV11, "sub1");
+      unsubscribe(conn, "sub1");
 
-      connV11.disconnect();
+      conn.disconnect();
 
       //Nack makes the message be dropped.
       MessageConsumer consumer = session.createConsumer(queue);
@@ -977,25 +872,25 @@ public class StompV11Test extends StompV11TestBase {
 
    @Test
    public void testNackWithWrongSubId() throws Exception {
-      connV11.connect(defUser, defPass);
+      conn.connect(defUser, defPass);
 
-      subscribe(connV11, "sub1", "client");
+      subscribe(conn, "sub1", Stomp.Headers.Subscribe.AckModeValues.CLIENT);
 
-      sendMessage(getName());
+      sendJmsMessage(getName());
 
-      ClientStompFrame frame = connV11.receiveFrame();
+      ClientStompFrame frame = conn.receiveFrame();
 
-      String messageID = frame.getHeader("message-id");
+      String messageID = frame.getHeader(Stomp.Headers.Message.MESSAGE_ID);
 
-      nack(connV11, "sub2", messageID);
+      nack(conn, "sub2", messageID);
 
-      ClientStompFrame error = connV11.receiveFrame();
+      ClientStompFrame error = conn.receiveFrame();
 
-      System.out.println("Receiver error: " + error);
+      IntegrationTestLogger.LOGGER.info("Receiver error: " + error);
 
-      unsubscribe(connV11, "sub1");
+      unsubscribe(conn, "sub1");
 
-      connV11.disconnect();
+      conn.disconnect();
 
       //message should be still there
       MessageConsumer consumer = session.createConsumer(queue);
@@ -1005,25 +900,25 @@ public class StompV11Test extends StompV11TestBase {
 
    @Test
    public void testNackWithWrongMessageId() throws Exception {
-      connV11.connect(defUser, defPass);
+      conn.connect(defUser, defPass);
 
-      subscribe(connV11, "sub1", "client");
+      subscribe(conn, "sub1", Stomp.Headers.Subscribe.AckModeValues.CLIENT);
 
-      sendMessage(getName());
+      sendJmsMessage(getName());
 
-      ClientStompFrame frame = connV11.receiveFrame();
+      ClientStompFrame frame = conn.receiveFrame();
 
-      frame.getHeader("message-id");
+      frame.getHeader(Stomp.Headers.Message.MESSAGE_ID);
 
-      nack(connV11, "sub2", "someother");
+      nack(conn, "sub2", "someother");
 
-      ClientStompFrame error = connV11.receiveFrame();
+      ClientStompFrame error = conn.receiveFrame();
 
-      System.out.println("Receiver error: " + error);
+      IntegrationTestLogger.LOGGER.info("Receiver error: " + error);
 
-      unsubscribe(connV11, "sub1");
+      unsubscribe(conn, "sub1");
 
-      connV11.disconnect();
+      conn.disconnect();
 
       //message should still there
       MessageConsumer consumer = session.createConsumer(queue);
@@ -1033,21 +928,21 @@ public class StompV11Test extends StompV11TestBase {
 
    @Test
    public void testAck() throws Exception {
-      connV11.connect(defUser, defPass);
+      conn.connect(defUser, defPass);
 
-      subscribe(connV11, "sub1", "client");
+      subscribe(conn, "sub1", Stomp.Headers.Subscribe.AckModeValues.CLIENT);
 
-      sendMessage(getName());
+      sendJmsMessage(getName());
 
-      ClientStompFrame frame = connV11.receiveFrame();
+      ClientStompFrame frame = conn.receiveFrame();
 
-      String messageID = frame.getHeader("message-id");
+      String messageID = frame.getHeader(Stomp.Headers.Message.MESSAGE_ID);
 
-      ack(connV11, "sub1", messageID, null);
+      ack(conn, "sub1", messageID, null);
 
-      unsubscribe(connV11, "sub1");
+      unsubscribe(conn, "sub1");
 
-      connV11.disconnect();
+      conn.disconnect();
 
       //Nack makes the message be dropped.
       MessageConsumer consumer = session.createConsumer(queue);
@@ -1057,25 +952,25 @@ public class StompV11Test extends StompV11TestBase {
 
    @Test
    public void testAckWithWrongSubId() throws Exception {
-      connV11.connect(defUser, defPass);
+      conn.connect(defUser, defPass);
 
-      subscribe(connV11, "sub1", "client");
+      subscribe(conn, "sub1", Stomp.Headers.Subscribe.AckModeValues.CLIENT);
 
-      sendMessage(getName());
+      sendJmsMessage(getName());
 
-      ClientStompFrame frame = connV11.receiveFrame();
+      ClientStompFrame frame = conn.receiveFrame();
 
-      String messageID = frame.getHeader("message-id");
+      String messageID = frame.getHeader(Stomp.Headers.Message.MESSAGE_ID);
 
-      ack(connV11, "sub2", messageID, null);
+      ack(conn, "sub2", messageID, null);
 
-      ClientStompFrame error = connV11.receiveFrame();
+      ClientStompFrame error = conn.receiveFrame();
 
-      System.out.println("Receiver error: " + error);
+      IntegrationTestLogger.LOGGER.info("Receiver error: " + error);
 
-      unsubscribe(connV11, "sub1");
+      unsubscribe(conn, "sub1");
 
-      connV11.disconnect();
+      conn.disconnect();
 
       //message should be still there
       MessageConsumer consumer = session.createConsumer(queue);
@@ -1085,25 +980,25 @@ public class StompV11Test extends StompV11TestBase {
 
    @Test
    public void testAckWithWrongMessageId() throws Exception {
-      connV11.connect(defUser, defPass);
+      conn.connect(defUser, defPass);
 
-      subscribe(connV11, "sub1", "client");
+      subscribe(conn, "sub1", Stomp.Headers.Subscribe.AckModeValues.CLIENT);
 
-      sendMessage(getName());
+      sendJmsMessage(getName());
 
-      ClientStompFrame frame = connV11.receiveFrame();
+      ClientStompFrame frame = conn.receiveFrame();
 
-      frame.getHeader("message-id");
+      frame.getHeader(Stomp.Headers.Message.MESSAGE_ID);
 
-      ack(connV11, "sub2", "someother", null);
+      ack(conn, "sub2", "someother", null);
 
-      ClientStompFrame error = connV11.receiveFrame();
+      ClientStompFrame error = conn.receiveFrame();
 
-      System.out.println("Receiver error: " + error);
+      IntegrationTestLogger.LOGGER.info("Receiver error: " + error);
 
-      unsubscribe(connV11, "sub1");
+      unsubscribe(conn, "sub1");
 
-      connV11.disconnect();
+      conn.disconnect();
 
       //message should still there
       MessageConsumer consumer = session.createConsumer(queue);
@@ -1113,33 +1008,33 @@ public class StompV11Test extends StompV11TestBase {
 
    @Test
    public void testErrorWithReceipt() throws Exception {
-      connV11.connect(defUser, defPass);
+      conn.connect(defUser, defPass);
 
-      subscribe(connV11, "sub1", "client");
+      subscribe(conn, "sub1", Stomp.Headers.Subscribe.AckModeValues.CLIENT);
 
-      sendMessage(getName());
+      sendJmsMessage(getName());
 
-      ClientStompFrame frame = connV11.receiveFrame();
+      ClientStompFrame frame = conn.receiveFrame();
 
-      String messageID = frame.getHeader("message-id");
+      String messageID = frame.getHeader(Stomp.Headers.Message.MESSAGE_ID);
 
-      ClientStompFrame ackFrame = connV11.createFrame("ACK");
       //give it a wrong sub id
-      ackFrame.addHeader("subscription", "sub2");
-      ackFrame.addHeader("message-id", messageID);
-      ackFrame.addHeader("receipt", "answer-me");
+      ClientStompFrame ackFrame = conn.createFrame(Stomp.Commands.ACK)
+                                      .addHeader(Stomp.Headers.Ack.SUBSCRIPTION, "sub2")
+                                      .addHeader(Stomp.Headers.Message.MESSAGE_ID, messageID)
+                                      .addHeader(Stomp.Headers.RECEIPT_REQUESTED, "answer-me");
 
-      ClientStompFrame error = connV11.sendFrame(ackFrame);
+      ClientStompFrame error = conn.sendFrame(ackFrame);
 
-      System.out.println("Receiver error: " + error);
+      IntegrationTestLogger.LOGGER.info("Receiver error: " + error);
 
-      assertEquals("ERROR", error.getCommand());
+      assertEquals(Stomp.Responses.ERROR, error.getCommand());
 
-      assertEquals("answer-me", error.getHeader("receipt-id"));
+      assertEquals("answer-me", error.getHeader(Stomp.Headers.Response.RECEIPT_ID));
 
-      unsubscribe(connV11, "sub1");
+      unsubscribe(conn, "sub1");
 
-      connV11.disconnect();
+      conn.disconnect();
 
       //message should still there
       MessageConsumer consumer = session.createConsumer(queue);
@@ -1149,33 +1044,33 @@ public class StompV11Test extends StompV11TestBase {
 
    @Test
    public void testErrorWithReceipt2() throws Exception {
-      connV11.connect(defUser, defPass);
+      conn.connect(defUser, defPass);
 
-      subscribe(connV11, "sub1", "client");
+      subscribe(conn, "sub1", Stomp.Headers.Subscribe.AckModeValues.CLIENT);
 
-      sendMessage(getName());
+      sendJmsMessage(getName());
 
-      ClientStompFrame frame = connV11.receiveFrame();
+      ClientStompFrame frame = conn.receiveFrame();
 
-      String messageID = frame.getHeader("message-id");
+      String messageID = frame.getHeader(Stomp.Headers.Message.MESSAGE_ID);
 
-      ClientStompFrame ackFrame = connV11.createFrame("ACK");
       //give it a wrong sub id
-      ackFrame.addHeader("subscription", "sub1");
-      ackFrame.addHeader("message-id", String.valueOf(Long.valueOf(messageID) + 1));
-      ackFrame.addHeader("receipt", "answer-me");
+      ClientStompFrame ackFrame = conn.createFrame(Stomp.Commands.ACK)
+                                      .addHeader(Stomp.Headers.Ack.SUBSCRIPTION, "sub1")
+                                      .addHeader(Stomp.Headers.Message.MESSAGE_ID, String.valueOf(Long.valueOf(messageID) + 1))
+                                      .addHeader(Stomp.Headers.RECEIPT_REQUESTED, "answer-me");
 
-      ClientStompFrame error = connV11.sendFrame(ackFrame);
+      ClientStompFrame error = conn.sendFrame(ackFrame);
 
-      System.out.println("Receiver error: " + error);
+      IntegrationTestLogger.LOGGER.info("Receiver error: " + error);
 
-      assertEquals("ERROR", error.getCommand());
+      assertEquals(Stomp.Responses.ERROR, error.getCommand());
 
-      assertEquals("answer-me", error.getHeader("receipt-id"));
+      assertEquals("answer-me", error.getHeader(Stomp.Headers.Response.RECEIPT_ID));
 
-      unsubscribe(connV11, "sub1");
+      unsubscribe(conn, "sub1");
 
-      connV11.disconnect();
+      conn.disconnect();
 
       //message should still there
       MessageConsumer consumer = session.createConsumer(queue);
@@ -1185,29 +1080,29 @@ public class StompV11Test extends StompV11TestBase {
 
    @Test
    public void testAckModeClient() throws Exception {
-      connV11.connect(defUser, defPass);
+      conn.connect(defUser, defPass);
 
-      subscribe(connV11, "sub1", "client");
+      subscribe(conn, "sub1", Stomp.Headers.Subscribe.AckModeValues.CLIENT);
 
       int num = 50;
       //send a bunch of messages
       for (int i = 0; i < num; i++) {
-         this.sendMessage("client-ack" + i);
+         this.sendJmsMessage("client-ack" + i);
       }
 
       ClientStompFrame frame = null;
 
       for (int i = 0; i < num; i++) {
-         frame = connV11.receiveFrame();
+         frame = conn.receiveFrame();
          assertNotNull(frame);
       }
 
       //ack the last
-      this.ack(connV11, "sub1", frame);
+      this.ack(conn, "sub1", frame);
 
-      unsubscribe(connV11, "sub1");
+      unsubscribe(conn, "sub1");
 
-      connV11.disconnect();
+      conn.disconnect();
 
       //no messages can be received.
       MessageConsumer consumer = session.createConsumer(queue);
@@ -1217,31 +1112,31 @@ public class StompV11Test extends StompV11TestBase {
 
    @Test
    public void testAckModeClient2() throws Exception {
-      connV11.connect(defUser, defPass);
+      conn.connect(defUser, defPass);
 
-      subscribe(connV11, "sub1", "client");
+      subscribe(conn, "sub1", Stomp.Headers.Subscribe.AckModeValues.CLIENT);
 
       int num = 50;
       //send a bunch of messages
       for (int i = 0; i < num; i++) {
-         this.sendMessage("client-ack" + i);
+         this.sendJmsMessage("client-ack" + i);
       }
 
       ClientStompFrame frame = null;
 
       for (int i = 0; i < num; i++) {
-         frame = connV11.receiveFrame();
+         frame = conn.receiveFrame();
          assertNotNull(frame);
 
          //ack the 49th
          if (i == num - 2) {
-            this.ack(connV11, "sub1", frame);
+            this.ack(conn, "sub1", frame);
          }
       }
 
-      unsubscribe(connV11, "sub1");
+      unsubscribe(conn, "sub1");
 
-      connV11.disconnect();
+      conn.disconnect();
 
       //no messages can be received.
       MessageConsumer consumer = session.createConsumer(queue);
@@ -1253,26 +1148,26 @@ public class StompV11Test extends StompV11TestBase {
 
    @Test
    public void testAckModeAuto() throws Exception {
-      connV11.connect(defUser, defPass);
+      conn.connect(defUser, defPass);
 
-      subscribe(connV11, "sub1", "auto");
+      subscribe(conn, "sub1", Stomp.Headers.Subscribe.AckModeValues.AUTO);
 
       int num = 50;
       //send a bunch of messages
       for (int i = 0; i < num; i++) {
-         this.sendMessage("auto-ack" + i);
+         this.sendJmsMessage("auto-ack" + i);
       }
 
       ClientStompFrame frame = null;
 
       for (int i = 0; i < num; i++) {
-         frame = connV11.receiveFrame();
+         frame = conn.receiveFrame();
          assertNotNull(frame);
       }
 
-      unsubscribe(connV11, "sub1");
+      unsubscribe(conn, "sub1");
 
-      connV11.disconnect();
+      conn.disconnect();
 
       //no messages can be received.
       MessageConsumer consumer = session.createConsumer(queue);
@@ -1282,32 +1177,32 @@ public class StompV11Test extends StompV11TestBase {
 
    @Test
    public void testAckModeClientIndividual() throws Exception {
-      connV11.connect(defUser, defPass);
+      conn.connect(defUser, defPass);
 
-      subscribe(connV11, "sub1", "client-individual");
+      subscribe(conn, "sub1", "client-individual");
 
       int num = 50;
       //send a bunch of messages
       for (int i = 0; i < num; i++) {
-         this.sendMessage("client-individual-ack" + i);
+         this.sendJmsMessage("client-individual-ack" + i);
       }
 
       ClientStompFrame frame = null;
 
       for (int i = 0; i < num; i++) {
-         frame = connV11.receiveFrame();
+         frame = conn.receiveFrame();
          assertNotNull(frame);
 
-         System.out.println(i + " == received: " + frame);
+         IntegrationTestLogger.LOGGER.info(i + " == received: " + frame);
          //ack on even numbers
          if (i % 2 == 0) {
-            this.ack(connV11, "sub1", frame);
+            this.ack(conn, "sub1", frame);
          }
       }
 
-      unsubscribe(connV11, "sub1");
+      unsubscribe(conn, "sub1");
 
-      connV11.disconnect();
+      conn.disconnect();
 
       //no messages can be received.
       MessageConsumer consumer = session.createConsumer(queue);
@@ -1316,7 +1211,7 @@ public class StompV11Test extends StompV11TestBase {
       for (int i = 0; i < num / 2; i++) {
          message = (TextMessage) consumer.receive(1000);
          Assert.assertNotNull(message);
-         System.out.println("Legal: " + message.getText());
+         IntegrationTestLogger.LOGGER.info("Legal: " + message.getText());
       }
 
       message = (TextMessage) consumer.receive(1000);
@@ -1326,64 +1221,55 @@ public class StompV11Test extends StompV11TestBase {
 
    @Test
    public void testTwoSubscribers() throws Exception {
-      connV11.connect(defUser, defPass, CLIENT_ID);
+      conn.connect(defUser, defPass, CLIENT_ID);
 
-      this.subscribeTopic(connV11, "sub1", "auto", null);
+      this.subscribeTopic(conn, "sub1", Stomp.Headers.Subscribe.AckModeValues.AUTO, null);
 
       StompClientConnection newConn = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
       newConn.connect(defUser, defPass, "myclientid2");
 
-      this.subscribeTopic(newConn, "sub2", "auto", null);
-
-      ClientStompFrame frame = connV11.createFrame("SEND");
-      frame.addHeader("destination", getTopicPrefix() + getTopicName());
-
-      frame.setBody("Hello World");
+      this.subscribeTopic(newConn, "sub2", Stomp.Headers.Subscribe.AckModeValues.AUTO, null);
 
-      connV11.sendFrame(frame);
+      send(conn, getTopicPrefix() + getTopicName(), null, "Hello World");
 
       // receive message from socket
-      frame = connV11.receiveFrame(1000);
+      ClientStompFrame frame = conn.receiveFrame(1000);
 
-      System.out.println("received frame : " + frame);
+      IntegrationTestLogger.LOGGER.info("received frame : " + frame);
       assertEquals("Hello World", frame.getBody());
-      assertEquals("sub1", frame.getHeader("subscription"));
+      assertEquals("sub1", frame.getHeader(Stomp.Headers.Message.SUBSCRIPTION));
 
       frame = newConn.receiveFrame(1000);
 
-      System.out.println("received 2 frame : " + frame);
+      IntegrationTestLogger.LOGGER.info("received 2 frame : " + frame);
       assertEquals("Hello World", frame.getBody());
-      assertEquals("sub2", frame.getHeader("subscription"));
+      assertEquals("sub2", frame.getHeader(Stomp.Headers.Message.SUBSCRIPTION));
 
       // remove suscription
-      this.unsubscribe(connV11, "sub1", true);
+      this.unsubscribe(conn, "sub1", true);
       this.unsubscribe(newConn, "sub2", true);
 
-      connV11.disconnect();
+      conn.disconnect();
       newConn.disconnect();
    }
 
    @Test
    public void testSendAndReceiveOnDifferentConnections() throws Exception {
-      connV11.connect(defUser, defPass);
-
-      ClientStompFrame sendFrame = connV11.createFrame("SEND");
-      sendFrame.addHeader("destination", getQueuePrefix() + getQueueName());
-      sendFrame.setBody("Hello World");
+      conn.connect(defUser, defPass);
 
-      connV11.sendFrame(sendFrame);
+      send(conn, getQueuePrefix() + getQueueName(), null, "Hello World");
 
       StompClientConnection connV11_2 = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
       connV11_2.connect(defUser, defPass);
 
-      this.subscribe(connV11_2, "sub1", "auto");
+      this.subscribe(connV11_2, "sub1", Stomp.Headers.Subscribe.AckModeValues.AUTO);
 
       ClientStompFrame frame = connV11_2.receiveFrame(2000);
 
-      assertEquals("MESSAGE", frame.getCommand());
+      assertEquals(Stomp.Responses.MESSAGE, frame.getCommand());
       assertEquals("Hello World", frame.getBody());
 
-      connV11.disconnect();
+      conn.disconnect();
       connV11_2.disconnect();
    }
 
@@ -1391,79 +1277,81 @@ public class StompV11Test extends StompV11TestBase {
 
    @Test
    public void testBeginSameTransactionTwice() throws Exception {
-      connV11.connect(defUser, defPass);
+      conn.connect(defUser, defPass);
 
-      beginTransaction(connV11, "tx1");
+      beginTransaction(conn, "tx1");
 
-      beginTransaction(connV11, "tx1");
+      beginTransaction(conn, "tx1");
 
-      ClientStompFrame f = connV11.receiveFrame();
-      Assert.assertTrue(f.getCommand().equals("ERROR"));
+      ClientStompFrame f = conn.receiveFrame();
+      Assert.assertTrue(f.getCommand().equals(Stomp.Responses.ERROR));
    }
 
    @Test
    public void testBodyWithUTF8() throws Exception {
-      connV11.connect(defUser, defPass);
+      conn.connect(defUser, defPass);
 
-      this.subscribe(connV11, getName(), "auto");
+      this.subscribe(conn, getName(), Stomp.Headers.Subscribe.AckModeValues.AUTO);
 
       String text = "A" + "\u00ea" + "\u00f1" + "\u00fc" + "C";
-      System.out.println(text);
-      sendMessage(text);
+      IntegrationTestLogger.LOGGER.info(text);
+      sendJmsMessage(text);
 
-      ClientStompFrame frame = connV11.receiveFrame();
-      System.out.println(frame);
-      Assert.assertTrue(frame.getCommand().equals("MESSAGE"));
-      Assert.assertNotNull(frame.getHeader("destination"));
+      ClientStompFrame frame = conn.receiveFrame();
+      IntegrationTestLogger.LOGGER.info(frame);
+      Assert.assertTrue(frame.getCommand().equals(Stomp.Responses.MESSAGE));
+      Assert.assertNotNull(frame.getHeader(Stomp.Headers.Message.DESTINATION));
       Assert.assertTrue(frame.getBody().equals(text));
 
-      connV11.disconnect();
+      conn.disconnect();
    }
 
    @Test
    public void testClientAckNotPartOfTransaction() throws Exception {
-      connV11.connect(defUser, defPass);
+      conn.connect(defUser, defPass);
 
-      this.subscribe(connV11, getName(), "client");
+      this.subscribe(conn, getName(), Stomp.Headers.Subscribe.AckModeValues.CLIENT);
 
-      sendMessage(getName());
+      sendJmsMessage(getName());
 
-      ClientStompFrame frame = connV11.receiveFrame();
+      ClientStompFrame frame = conn.receiveFrame();
 
-      Assert.assertTrue(frame.getCommand().equals("MESSAGE"));
-      Assert.assertNotNull(frame.getHeader("destination"));
+      Assert.assertTrue(frame.getCommand().equals(Stomp.Responses.MESSAGE));
+      Assert.assertNotNull(frame.getHeader(Stomp.Headers.Message.DESTINATION));
       Assert.assertTrue(frame.getBody().equals(getName()));
-      Assert.assertNotNull(frame.getHeader("message-id"));
+      Assert.assertNotNull(frame.getHeader(Stomp.Headers.Message.MESSAGE_ID));
 
-      String messageID = frame.getHeader("message-id");
+      String messageID = frame.getHeader(Stomp.Headers.Message.MESSAGE_ID);
 
-      beginTransaction(connV11, "tx1");
+      beginTransaction(conn, "tx1");
 
-      this.ack(connV11, getName(), messageID, "tx1");
+      this.ack(conn, getName(), messageID, "tx1");
 
-      abortTransaction(connV11, "tx1");
+      abortTransaction(conn, "tx1");
 
-      frame = connV11.receiveFrame(500);
+      frame = conn.receiveFrame(500);
 
       assertNull(frame);
 
-      this.unsubscribe(connV11, getName());
+      this.unsubscribe(conn, getName());
 
-      connV11.disconnect();
+      conn.disconnect();
    }
 
    @Test
    public void testDisconnectAndError() throws Exception {
-      connV11.connect(defUser, defPass);
+      conn.connect(defUser, defPass);
+
+      this.subscribe(conn, getName(), Stomp.Headers.Subscribe.AckModeValues.CLIENT);
 
-      this.subscribe(connV11, getName(), "client");
+      String uuid = UUID.randomUUID().toString();
 
-      ClientStompFrame frame = connV11.createFrame("DISCONNECT");
-      frame.addHeader("receipt", "1");
+      ClientStompFrame frame = conn.createFrame(Stomp.Commands.DISCONNECT)
+                                   .addHeader(Stomp.Headers.RECEIPT_REQUESTED, uuid);
 
-      ClientStompFrame result = connV11.sendFrame(frame);
+      ClientStompFrame result = conn.sendFrame(frame);
 
-      if (result == null || (!"RECEIPT".equals(result.getCommand())) || (!"1".equals(result.getHeader("receipt-id")))) {
+      if (result == null || (!Stomp.Responses.RECEIPT.equals(result.getCommand())) || (!uuid.equals(result.getHeader(Stomp.Headers.Response.RECEIPT_ID)))) {
          fail("Disconnect failed! " + result);
       }
 
@@ -1472,12 +1360,9 @@ public class StompV11Test extends StompV11TestBase {
       Thread thr = new Thread() {
          @Override
          public void run() {
-            ClientStompFrame sendFrame = connV11.createFrame("SEND");
-            sendFrame.addHeader("destination", getQueuePrefix() + getQueueName());
-            sendFrame.setBody("Hello World");
             while (latch.getCount() != 0) {
                try {
-                  connV11.sendFrame(sendFrame);
+                  send(conn, getQueuePrefix() + getQueueName(), null, "Hello World");
                   Thread.sleep(500);
                } catch (InterruptedException e) {
                   //retry
@@ -1490,7 +1375,7 @@ public class StompV11Test extends StompV11TestBase {
                   latch.countDown();
                   break;
                } finally {
-                  connV11.destroy();
+                  conn.destroy();
                }
             }
          }
@@ -1510,66 +1395,68 @@ public class StompV11Test extends StompV11TestBase {
 
    @Test
    public void testDurableSubscriber() throws Exception {
-      connV11.connect(defUser, defPass);
+      conn.connect(defUser, defPass);
 
-      this.subscribe(connV11, "sub1", "client", getName());
+      this.subscribe(conn, "sub1", Stomp.Headers.Subscribe.AckModeValues.CLIENT, getName());
 
-      this.subscribe(connV11, "sub1", "client", getName());
+      this.subscribe(conn, "sub1", Stomp.Headers.Subscribe.AckModeValues.CLIENT, getName());
 
-      ClientStompFrame frame = connV11.receiveFrame();
-      Assert.assertTrue(frame.getCommand().equals("ERROR"));
+      ClientStompFrame frame = conn.receiveFrame();
+      Assert.assertTrue(frame.getCommand().equals(Stomp.Responses.ERROR));
 
-      connV11.disconnect();
+      conn.disconnect();
    }
 
    @Test
    public void testDurableSubscriberWithReconnection() throws Exception {
-      connV11.connect(defUser, defPass, CLIENT_ID);
+      conn.connect(defUser, defPass, CLIENT_ID);
+
+      this.subscribeTopic(conn, "sub1", Stomp.Headers.Subscribe.AckModeValues.AUTO, getName());
 
-      this.subscribeTopic(connV11, "sub1", "auto", getName());
+      String uuid = UUID.randomUUID().toString();
 
-      ClientStompFrame frame = connV11.createFrame("DISCONNECT");
-      frame.addHeader("receipt", "1");
+      ClientStompFrame frame = conn.createFrame(Stomp.Commands.DISCONNECT)
+                                   .addHeader(Stomp.Headers.RECEIPT_REQUESTED, uuid);
 
-      ClientStompFrame result = connV11.sendFrame(frame);
+      ClientStompFrame result = conn.sendFrame(frame);
 
-      if (result == null || (!"RECEIPT".equals(result.getCommand())) || (!"1".equals(result.getHeader("receipt-id")))) {
+      if (result == null || (!Stomp.Responses.RECEIPT.equals(result.getCommand())) || (!uuid.equals(result.getHeader(Stomp.Headers.Response.RECEIPT_ID)))) {
          fail("Disconnect failed! " + result);
       }
 
       // send the message when the durable subscriber is disconnected
-      sendMessage(getName(), topic);
+      sendJmsMessage(getName(), topic);
 
-      connV11.destroy();
-      connV11 = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
-      connV11.connect(defUser, defPass, CLIENT_ID);
+      conn.destroy();
+      conn = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
+      conn.connect(defUser, defPass, CLIENT_ID);
 
-      this.subscribeTopic(connV11, "sub1", "auto", getName());
+      this.subscribeTopic(conn, "sub1", Stomp.Headers.Subscribe.AckModeValues.AUTO, getName());
 
       // we must have received the message
-      frame = connV11.receiveFrame();
+      frame = conn.receiveFrame();
 
-      Assert.assertTrue(frame.getCommand().equals("MESSAGE"));
-      Assert.assertNotNull(frame.getHeader("destination"));
+      Assert.assertTrue(frame.getCommand().equals(Stomp.Responses.MESSAGE));
+      Assert.assertNotNull(frame.getHeader(Stomp.Headers.Message.DESTINATION));
       Assert.assertEquals(getName(), frame.getBody());
 
-      this.unsubscribe(connV11, "sub1");
+      this.unsubscribe(conn, "sub1");
 
-      connV11.disconnect();
+      conn.disconnect();
    }
 
    @Test
    public void testDurableUnSubscribe() throws Exception {
-      connV11.connect(defUser, defPass, CLIENT_ID);
+      conn.connect(defUser, defPass, CLIENT_ID);
 
-      this.subscribeTopic(connV11, null, "auto", getName());
+      this.subscribeTopic(conn, null, Stomp.Headers.Subscribe.AckModeValues.AUTO, getName());
 
-      connV11.disconnect();
-      connV11.destroy();
-      connV11 = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
-      connV11.connect(defUser, defPass, CLIENT_ID);
+      conn.disconnect();
+      conn.destroy();
+      conn = StompClientConnectionFactory.createClientConnection("1.1", hostname, port);
+      conn.connect(defUser, defPass, CLIENT_ID);
 
-      this.unsubscribe(connV11, getName(), false, true);
+      this.unsubscribe(conn, getName(), null, false, true);
 
       long start = System.currentTimeMillis();
       SimpleString queueName = SimpleString.toSimpleString(CLIENT_ID + "." + getName());
@@ -1579,21 +1466,21 @@ public class StompV11Test extends StompV11TestBase {
 
       assertNull(server.getActiveMQServer().locateQueue(queueName));
 
-      connV11.disconnect();
+      conn.disconnect();
    }
 
    @Test
    public void testJMSXGroupIdCanBeSet() throws Exception {
       MessageConsumer consumer = session.createConsumer(queue);
 
-      connV11.connect(defUser, defPass);
+      conn.connect(defUser, defPass);
 
-      ClientStompFrame frame = connV11.createFrame("SEND");
-      frame.addHeader("destination", getQueuePrefix() + getQueueName());
-      frame.addHeader("JMSXGroupID", "TEST");
-      frame.setBody("Hello World");
+      ClientStompFrame frame = conn.createFrame(Stomp.Commands.SEND)
+                                   .addHeader(Stomp.Headers.Send.DESTINATION, getQueuePrefix() + getQueueName())
+                                   .addHeader("JMSXGroupID", "TEST")
+                                   .setBody("Hello World");
 
-      connV11.sendFrame(frame);
+      conn.sendFrame(frame);
 
       TextMessage message = (TextMessage) consumer.receive(1000);
       Assert.assertNotNull(message);
@@ -1607,64 +1494,64 @@ public class StompV11Test extends StompV11TestBase {
       int ctr = 10;
       String[] data = new String[ctr];
 
-      connV11.connect(defUser, defPass);
+      conn.connect(defUser, defPass);
 
-      this.subscribe(connV11, "sub1", "auto");
+      this.subscribe(conn, "sub1", Stomp.Headers.Subscribe.AckModeValues.AUTO);
 
       for (int i = 0; i < ctr; ++i) {
          data[i] = getName() + i;
-         sendMessage(data[i]);
+         sendJmsMessage(data[i]);
       }
 
       ClientStompFrame frame = null;
 
       for (int i = 0; i < ctr; ++i) {
-         frame = connV11.receiveFrame();
+         frame = conn.receiveFrame();
          Assert.assertTrue("Message not in order", frame.getBody().equals(data[i]));
       }
 
       for (int i = 0; i < ctr; ++i) {
          data[i] = getName() + ":second:" + i;
-         sendMessage(data[i]);
+         sendJmsMessage(data[i]);
       }
 
       for (int i = 0; i < ctr; ++i) {
-         frame = connV11.receiveFrame();
+         frame = conn.receiveFrame();
          Assert.assertTrue("Message not in order", frame.getBody().equals(data[i]));
       }
 
-      connV11.disconnect();
+      conn.disconnect();
    }
 
    @Test
    public void testSubscribeWithAutoAckAndSelector() throws Exception {
-      connV11.connect(defUser, defPass);
+      conn.connect(defUser, defPass);
 
-      this.subscribe(connV11, "sub1", "auto", null, "foo = 'zzz'");
+      this.subscribe(conn, "sub1", Stomp.Headers.Subscribe.AckModeValues.AUTO, null, "foo = 'zzz'");
 
-      sendMessage("Ignored message", "foo", "1234");
-      sendMessage("Real message", "foo", "zzz");
+      sendJmsMessage("Ignored message", "foo", "1234");
+      sendJmsMessage("Real message", "foo", "zzz");
 
-      ClientStompFrame frame = connV11.receiveFrame();
+      ClientStompFrame frame = conn.receiveFrame();
 
       Assert.assertTrue("Should have received the real message but got: " + frame, frame.getBody().equals("Real message"));
 
-      connV11.disconnect();
+      conn.disconnect();
    }
 
    @Test
    public void testRedeliveryWithClientAck() throws Exception {
-      connV11.connect(defUser, defPass);
+      conn.connect(defUser, defPass);
 
-      this.subscribe(connV11, "subId", "client");
+      this.subscribe(conn, "subscriptionId", Stomp.Headers.Subscribe.AckModeValues.CLIENT);
 
-      sendMessage(getName());
+      sendJmsMessage(getName());
 
-      ClientStompFrame frame = connV11.receiveFrame();
+      ClientStompFrame frame = conn.receiveFrame();
 
-      assertTrue(frame.getCommand().equals("MESSAGE"));
+      assertTrue(frame.getCommand().equals(Stomp.Responses.MESSAGE));
 
-      connV11.disconnect();
+      conn.disconnect();
 
       // message should be received since message was not acknowledged
       MessageConsumer consumer = session.createConsumer(queue);
@@ -1677,7 +1564,7 @@ public class StompV11Test extends StompV11TestBase {
    public void testSendManyMessages() throws Exception {
       MessageConsumer consumer = session.createConsumer(queue);
 
-      connV11.connect(defUser, defPass);
+      conn.connect(defUser, defPass);
 
       int count = 1000;
       final CountDownLatch latch = new CountDownLatch(count);
@@ -1688,30 +1575,22 @@ public class StompV11Test extends StompV11TestBase {
          }
       });
 
-      ClientStompFrame frame = connV11.createFrame("SEND");
-      frame.addHeader("destination", getQueuePrefix() + getQueueName());
-      frame.setBody("Hello World");
-
       for (int i = 1; i <= count; i++) {
-         connV11.sendFrame(frame);
+         send(conn, getQueuePrefix() + getQueueName(), null, "Hello World");
       }
 
       assertTrue(latch.await(60, TimeUnit.SECONDS));
 
-      connV11.disconnect();
+      conn.disconnect();
    }
 
    @Test
    public void testSendMessage() throws Exception {
       MessageConsumer consumer = session.createConsumer(queue);
 
-      connV11.connect(defUser, defPass);
-
-      ClientStompFrame frame = connV11.createFrame("SEND");
-      frame.addHeader("destination", getQueuePrefix() + getQueueName());
-      frame.setBody("Hello World");
+      conn.connect(defUser, defPass);
 
-      connV11.sendFrame(frame);
+      send(conn, getQueuePrefix() + getQueueName(), null, "Hello World");
 
       TextMessage message = (TextMessage) consumer.receive(1000);
       Assert.assertNotNull(message);
@@ -1730,18 +1609,16 @@ public class StompV11Test extends StompV11TestBase {
    public void testSendMessageWithContentLength() throws Exception {
       MessageConsumer consumer = session.createConsumer(queue);
 
-      connV11.connect(defUser, defPass);
+      conn.connect(defUser, defPass);
 
       byte[] data = new byte[]{1, 0, 0, 4};
 
-      ClientStompFrame frame = connV11.createFrame("SEND");
-
-      frame.addHeader("destination", getQueuePrefix() + getQueueName());
-      frame.setBody(new String(data, StandardCharsets.UTF_8));
-
-      frame.addHeader("content-length", String.valueOf(data.length));
+      ClientStompFrame frame = conn.createFrame(Stomp.Commands.SEND)
+                                   .addHeader(Stomp.Headers.Send.DESTINATION, getQueuePrefix() + getQueueName())
+                                   .setBody(new String(data, StandardCharsets.UTF_8))
+                                   .addHeader(Stomp.Headers.CONTENT_LENGTH, String.valueOf(data.length));
 
-      connV11.sendFrame(frame);
+      conn.sendFrame(frame);
 
       BytesMessage message = (BytesMessage) consumer.receive(10000);
       Assert.assertNotNull(message);
@@ -1757,16 +1634,15 @@ public class StompV11Test extends StompV11TestBase {
    public void testSendMessageWithCustomHeadersAndSelector() throws Exception {
       MessageConsumer consumer = session.createConsumer(queue, "foo = 'abc'");
 
-      connV11.connect(defUser, defPass);
+      conn.connect(defUser, defPass);
 
-      ClientStompFrame frame = connV11.createFrame("SEND");
-      frame.addHeader("foo", "abc");
-      frame.addHeader("bar", "123");
+      ClientStompFrame frame = conn.createFrame(Stomp.Commands.SEND)
+                                   .addHeader("foo", "abc")
+                                   .addHeader("bar", "123")
+                                   .addHeader(Stomp.Headers.Send.DESTINATION, getQueuePrefix() + getQueueName())
+                                   .setBody("Hello World");
 
-      frame.addHeader("destination", getQueuePrefix() + getQueueName());
-      frame.setBody("Hello World");
-
-      connV11.sendFrame(frame);
+      conn.sendFrame(frame);
 
       TextMessage message = (TextMessage) consumer.receive(1000);
       Assert.assertNotNull(message);
@@ -1779,14 +1655,13 @@ public class StompV11Test extends StompV11TestBase {
    public void testSendMessageWithLeadingNewLine() throws Exception {
       MessageConsumer consumer = session.createConsumer(queue);
 
-      connV11.connect(defUser, defPass);
-
-      ClientStompFrame frame = connV11.createFrame("SEND");
+      conn.connect(defUser, defPass);
 
-      frame.addHeader("destination", getQueuePrefix() + getQueueName());
-      frame.setBody("Hello World");
+      ClientStompFrame frame = conn.createFrame(Stomp.Commands.SEND)
+                                   .addHeader(Stomp.Headers.Send.DESTINATION, getQueuePrefix() + getQueueName())
+                                   .setBody("Hello World");
 
-      connV11.sendWickedFrame(frame);
+      conn.sendWickedFrame(frame);
 
       TextMessage message = (TextMessage) consumer.receive(1000);
       Assert.assertNotNull(message);
@@ -1800,24 +1675,17 @@ public class StompV11Test extends StompV11TestBase {
 
       assertNull(consumer.receive(1000));
 
-      connV11.disconnect();
+      conn.disconnect();
    }
 
    @Test
    public void testSendMessageWithReceipt() throws Exception {
       MessageConsumer consumer = session.createConsumer(queue);
 
-      connV11.connect(defUser, defPass);
+      conn.connect(defUser, defPass);
 
-      ClientStompFrame frame = connV11.createFrame("SEND");
-      frame.addHeader("destination", getQueuePrefix() + getQueueName());
-      frame.addHeader("receipt", "1234");
-      frame.setBody("Hello World");
 
-      frame = connV11.sendFrame(frame);
-
-      assertTrue(frame.getCommand().equals("RECEIPT"));
-      assertEquals("1234", frame.getHeader("receipt-id"));
+      send(conn, getQueuePrefix() + getQueueName(), null, "Hello World", true);
 
       TextMessage message = (TextMessage) consumer.receive(1000);
       Assert.assertNotNull(message);
@@ -1829,28 +1697,27 @@ public class StompV11Test extends StompV11TestBase {
       long tmsg = message.getJMSTimestamp();
       Assert.assertTrue(Math.abs(tnow - tmsg) < 1000);
 
-      connV11.disconnect();
+      conn.disconnect();
    }
 
    @Test
    public void testSendMessageWithStandardHeaders() throws Exception {
       MessageConsumer consumer = session.createConsumer(queue);
 
-      connV11.connect(defUser, defPass);
-
-      ClientStompFrame frame = connV11.createFrame("SEND");
-      frame.addHeader("destination", getQueuePrefix() + getQueueName());
-      frame.addHeader("correlation-id", "c123");
-      frame.addHeader("persistent", "true");
-      frame.addHeader("priority", "3");
-      frame.addHeader("type", "t345");
-      frame.addHeader("JMSXGroupID", "abc");
-      frame.addHeader("foo", "abc");
-      frame.addHeader("bar", "123");
+      conn.connect(defUser, defPass);
 
-      frame.setBody("Hello World");
+      ClientStompFrame frame = conn.createFrame(Stomp.Commands.SEND)
+                                   .addHeader(Stomp.Headers.Send.DESTINATION, getQueuePrefix() + getQueueName())
+                                   .addHeader(Stomp.Headers.Message.CORRELATION_ID, "c123")
+                                   .addHeader(Stomp.Headers.Message.PERSISTENT, "true")
+                                   .addHeader(Stomp.Headers.Message.PRIORITY, "3")
+                                   .addHeader(Stomp.Headers.Message.TYPE, "t345")
+                                   .addHeader("JMSXGroupID", "abc")
+                                   .addHeader("foo", "abc")
+                                   .addHeader("bar", "123")
+                                   .setBody("Hello World");
 
-      frame = connV11.sendFrame(frame);
+      frame = conn.sendFrame(frame);
 
       TextMessage message = (TextMessage) consumer.receive(1000);
       Assert.assertNotNull(message);
@@ -1864,33 +1731,32 @@ public class StompV11Test extends StompV11TestBase {
 
       Assert.assertEquals("JMSXGroupID", "abc", message.getStringProperty("JMSXGroupID"));
 
-      connV11.disconnect();
+      conn.disconnect();
    }
 
    @Test
    public void testSendMessageWithLongHeaders() throws Exception {
       MessageConsumer consumer = session.createConsumer(queue);
 
-      connV11.connect(defUser, defPass);
+      conn.connect(defUser, defPass);
 
       StringBuffer buffer = new StringBuffer();
       for (int i = 0; i < 2048; i++) {
          buffer.append("a");
       }
 
-      ClientStompFrame frame = connV11.createFrame("SEND");
-      frame.addHeader("destination", getQueuePrefix() + getQueueName());
-      frame.addHeader("cor

<TRUNCATED>