You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by jb...@apache.org on 2016/11/11 19:23:35 UTC

[05/11] activemq-artemis git commit: Stomp refactor + track autocreation for addresses

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/84df373a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompClientConnectionV10.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompClientConnectionV10.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompClientConnectionV10.java
index 7a1a529..d32823b 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompClientConnectionV10.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompClientConnectionV10.java
@@ -18,52 +18,47 @@ package org.apache.activemq.artemis.tests.integration.stomp.util;
 
 import java.io.IOException;
 
-/**
- * pls use factory to create frames.
- */
+import org.apache.activemq.artemis.core.protocol.stomp.Stomp;
+import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
+
 public class StompClientConnectionV10 extends AbstractStompClientConnection {
 
    public StompClientConnectionV10(String host, int port) throws IOException {
       super("1.0", host, port);
    }
 
+   public StompClientConnectionV10(String version, String host, int port) throws IOException {
+      super(version, host, port);
+   }
+
    @Override
    public ClientStompFrame connect(String username, String passcode) throws IOException, InterruptedException {
-      ClientStompFrame frame = factory.newFrame(CONNECT_COMMAND);
-      frame.addHeader(LOGIN_HEADER, username);
-      frame.addHeader(PASSCODE_HEADER, passcode);
-
-      ClientStompFrame response = this.sendFrame(frame);
-
-      if (response.getCommand().equals(CONNECTED_COMMAND)) {
-         connected = true;
-      } else {
-         System.out.println("Connection failed with: " + response);
-         connected = false;
-      }
-      return response;
+      return connect(username, passcode, null);
    }
 
    @Override
-   public void connect(String username, String passcode, String clientID) throws IOException, InterruptedException {
-      ClientStompFrame frame = factory.newFrame(CONNECT_COMMAND);
-      frame.addHeader(LOGIN_HEADER, username);
-      frame.addHeader(PASSCODE_HEADER, passcode);
-      frame.addHeader(CLIENT_ID_HEADER, clientID);
+   public ClientStompFrame connect(String username, String passcode, String clientID) throws IOException, InterruptedException {
+      ClientStompFrame frame = factory.newFrame(Stomp.Commands.CONNECT);
+      frame.addHeader(Stomp.Headers.Connect.LOGIN, username);
+      frame.addHeader(Stomp.Headers.Connect.PASSCODE, passcode);
+      if (clientID != null) {
+         frame.addHeader(Stomp.Headers.Connect.CLIENT_ID, clientID);
+      }
 
       ClientStompFrame response = this.sendFrame(frame);
 
-      if (response.getCommand().equals(CONNECTED_COMMAND)) {
+      if (response.getCommand().equals(Stomp.Responses.CONNECTED)) {
          connected = true;
       } else {
-         System.out.println("Connection failed with: " + response);
+         IntegrationTestLogger.LOGGER.warn("Connection failed with: " + response);
          connected = false;
       }
+      return response;
    }
 
    @Override
    public void disconnect() throws IOException, InterruptedException {
-      ClientStompFrame frame = factory.newFrame(DISCONNECT_COMMAND);
+      ClientStompFrame frame = factory.newFrame(Stomp.Commands.DISCONNECT);
       this.sendFrame(frame);
 
       close();

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/84df373a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompClientConnectionV11.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompClientConnectionV11.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompClientConnectionV11.java
index aa07145..cfc8f92 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompClientConnectionV11.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompClientConnectionV11.java
@@ -17,55 +17,40 @@
 package org.apache.activemq.artemis.tests.integration.stomp.util;
 
 import java.io.IOException;
+import java.util.UUID;
 
-public class StompClientConnectionV11 extends AbstractStompClientConnection {
+import org.apache.activemq.artemis.core.protocol.stomp.Stomp;
+
+public class StompClientConnectionV11 extends StompClientConnectionV10 {
 
    public StompClientConnectionV11(String host, int port) throws IOException {
       super("1.1", host, port);
    }
 
-   @Override
-   public ClientStompFrame connect(String username, String passcode) throws IOException, InterruptedException {
-      ClientStompFrame frame = factory.newFrame(CONNECT_COMMAND);
-      frame.addHeader(ACCEPT_HEADER, "1.1");
-      frame.addHeader(HOST_HEADER, "localhost");
-      if (username != null) {
-         frame.addHeader(LOGIN_HEADER, username);
-         frame.addHeader(PASSCODE_HEADER, passcode);
-      }
-
-      ClientStompFrame response = this.sendFrame(frame);
-
-      if (response.getCommand().equals(CONNECTED_COMMAND)) {
-         String version = response.getHeader(VERSION_HEADER);
-         assert (version.equals("1.1"));
-
-         this.username = username;
-         this.passcode = passcode;
-         this.connected = true;
-      } else {
-         connected = false;
-      }
-      return response;
+   public StompClientConnectionV11(String version, String host, int port) throws IOException {
+      super(version, host, port);
    }
 
    @Override
-   public void connect(String username, String passcode, String clientID) throws IOException, InterruptedException {
-      ClientStompFrame frame = factory.newFrame(CONNECT_COMMAND);
-      frame.addHeader(ACCEPT_HEADER, "1.1");
-      frame.addHeader(HOST_HEADER, "localhost");
-      frame.addHeader(CLIENT_ID_HEADER, clientID);
+   public ClientStompFrame connect(String username, String passcode, String clientID) throws IOException, InterruptedException {
+      ClientStompFrame frame = factory.newFrame(Stomp.Commands.CONNECT);
+      frame.addHeader(Stomp.Headers.Connect.ACCEPT_VERSION, getVersion());
+      frame.addHeader(Stomp.Headers.Connect.HOST, "localhost");
+      if (clientID != null) {
+         frame.addHeader(Stomp.Headers.Connect.CLIENT_ID, clientID);
+      }
 
       if (username != null) {
-         frame.addHeader(LOGIN_HEADER, username);
-         frame.addHeader(PASSCODE_HEADER, passcode);
+         frame.addHeader(Stomp.Headers.Connect.LOGIN, username);
+         frame.addHeader(Stomp.Headers.Connect.PASSCODE, passcode);
       }
 
       ClientStompFrame response = this.sendFrame(frame);
 
-      if (response.getCommand().equals(CONNECTED_COMMAND)) {
-         String version = response.getHeader(VERSION_HEADER);
-         assert (version.equals("1.1"));
+      if (Stomp.Responses.CONNECTED.equals(response.getCommand())) {
+         String version = response.getHeader(Stomp.Headers.Connected.VERSION);
+         if (!version.equals(getVersion()))
+            throw new IllegalStateException("incorrect version!");
 
          this.username = username;
          this.passcode = passcode;
@@ -73,22 +58,24 @@ public class StompClientConnectionV11 extends AbstractStompClientConnection {
       } else {
          connected = false;
       }
+      return response;
    }
 
    public void connect1(String username, String passcode) throws IOException, InterruptedException {
-      ClientStompFrame frame = factory.newFrame(STOMP_COMMAND);
-      frame.addHeader(ACCEPT_HEADER, "1.0,1.1");
-      frame.addHeader(HOST_HEADER, "127.0.0.1");
+      ClientStompFrame frame = factory.newFrame(Stomp.Commands.STOMP);
+      frame.addHeader(Stomp.Headers.Connect.ACCEPT_VERSION, "1.0,1.1");
+      frame.addHeader(Stomp.Headers.Connect.HOST, "127.0.0.1");
       if (username != null) {
-         frame.addHeader(LOGIN_HEADER, username);
-         frame.addHeader(PASSCODE_HEADER, passcode);
+         frame.addHeader(Stomp.Headers.Connect.LOGIN, username);
+         frame.addHeader(Stomp.Headers.Connect.PASSCODE, passcode);
       }
 
       ClientStompFrame response = this.sendFrame(frame);
 
-      if (response.getCommand().equals(CONNECTED_COMMAND)) {
-         String version = response.getHeader(VERSION_HEADER);
-         assert (version.equals("1.1"));
+      if (Stomp.Responses.CONNECTED.equals(response.getCommand())) {
+         String version = response.getHeader(Stomp.Headers.Connected.VERSION);
+         if (!version.equals(getVersion()))
+            throw new IllegalStateException("incorrect version!");
 
          this.username = username;
          this.passcode = passcode;
@@ -103,12 +90,15 @@ public class StompClientConnectionV11 extends AbstractStompClientConnection {
    public void disconnect() throws IOException, InterruptedException {
       stopPinger();
 
-      ClientStompFrame frame = factory.newFrame(DISCONNECT_COMMAND);
-      frame.addHeader("receipt", "1");
+      ClientStompFrame frame = factory.newFrame(Stomp.Commands.DISCONNECT);
+
+      String uuid = UUID.randomUUID().toString();
+
+      frame.addHeader(Stomp.Headers.RECEIPT_REQUESTED, uuid);
 
       ClientStompFrame result = this.sendFrame(frame);
 
-      if (result == null || (!"RECEIPT".equals(result.getCommand())) || (!"1".equals(result.getHeader("receipt-id")))) {
+      if (result == null || (!Stomp.Responses.RECEIPT.equals(result.getCommand())) || (!uuid.equals(result.getHeader(Stomp.Headers.Response.RECEIPT_ID)))) {
          throw new IOException("Disconnect failed! " + result);
       }
 
@@ -122,4 +112,28 @@ public class StompClientConnectionV11 extends AbstractStompClientConnection {
       return factory.newFrame(command);
    }
 
+   @Override
+   public void startPinger(long interval) {
+      pinger = new Pinger(interval);
+      pinger.startPing();
+   }
+
+   @Override
+   public void stopPinger() {
+      if (pinger != null) {
+         pinger.stopPing();
+         try {
+            pinger.join();
+         } catch (InterruptedException e) {
+            e.printStackTrace();
+         }
+         pinger = null;
+      }
+   }
+
+   @Override
+   public int getServerPingNumber() {
+      return serverPingCounter;
+   }
+
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/84df373a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompClientConnectionV12.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompClientConnectionV12.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompClientConnectionV12.java
index fb77832..2d8f354 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompClientConnectionV12.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompClientConnectionV12.java
@@ -18,90 +18,13 @@ package org.apache.activemq.artemis.tests.integration.stomp.util;
 
 import java.io.IOException;
 
-public class StompClientConnectionV12 extends AbstractStompClientConnection {
+public class StompClientConnectionV12 extends StompClientConnectionV11 {
 
    public StompClientConnectionV12(String host, int port) throws IOException {
       super("1.2", host, port);
    }
 
-   @Override
-   public ClientStompFrame createFrame(String command) {
-      return factory.newFrame(command);
-   }
-
-   @Override
-   public ClientStompFrame connect(String username, String passcode) throws IOException, InterruptedException {
-      ClientStompFrame frame = factory.newFrame(CONNECT_COMMAND);
-      frame.addHeader(ACCEPT_HEADER, "1.2");
-      frame.addHeader(HOST_HEADER, "localhost");
-      if (username != null) {
-         frame.addHeader(LOGIN_HEADER, username);
-         frame.addHeader(PASSCODE_HEADER, passcode);
-      }
-
-      ClientStompFrame response = this.sendFrame(frame);
-
-      if (response.getCommand().equals(CONNECTED_COMMAND)) {
-         String version = response.getHeader(VERSION_HEADER);
-         if (!version.equals("1.2"))
-            throw new IllegalStateException("incorrect version!");
-
-         this.username = username;
-         this.passcode = passcode;
-         this.connected = true;
-      } else {
-         connected = false;
-      }
-      return response;
-   }
-
-   @Override
-   public void disconnect() throws IOException, InterruptedException {
-      stopPinger();
-
-      ClientStompFrame frame = factory.newFrame(DISCONNECT_COMMAND);
-      frame.addHeader("receipt", "1");
-
-      ClientStompFrame result = this.sendFrame(frame);
-
-      if (result == null || (!"RECEIPT".equals(result.getCommand())) || (!"1".equals(result.getHeader("receipt-id")))) {
-         throw new IOException("Disconnect failed! " + result);
-      }
-
-      close();
-
-      connected = false;
-   }
-
-   @Override
-   public void connect(String username, String passcode, String clientID) throws Exception {
-      ClientStompFrame frame = factory.newFrame(CONNECT_COMMAND);
-      frame.addHeader(ACCEPT_HEADER, "1.2");
-      frame.addHeader(HOST_HEADER, "localhost");
-      frame.addHeader(CLIENT_ID_HEADER, clientID);
-
-      if (username != null) {
-         frame.addHeader(LOGIN_HEADER, username);
-         frame.addHeader(PASSCODE_HEADER, passcode);
-      }
-
-      ClientStompFrame response = this.sendFrame(frame);
-
-      if (response.getCommand().equals(CONNECTED_COMMAND)) {
-         String version = response.getHeader(VERSION_HEADER);
-         if (!version.equals("1.2"))
-            throw new IllegalStateException("incorrect version!");
-
-         this.username = username;
-         this.passcode = passcode;
-         this.connected = true;
-      } else {
-         connected = false;
-      }
-   }
-
    public ClientStompFrame createAnyFrame(String command) {
       return factory.newAnyFrame(command);
    }
-
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/84df373a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompFrameFactory.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompFrameFactory.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompFrameFactory.java
index 3ec03cf..1c78c5a 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompFrameFactory.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompFrameFactory.java
@@ -24,4 +24,6 @@ public interface StompFrameFactory {
 
    ClientStompFrame newAnyFrame(String command);
 
+   String[] handleHeaders(String header);
+
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/84df373a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompFrameFactoryV10.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompFrameFactoryV10.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompFrameFactoryV10.java
index 5ed8566..8813fd6 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompFrameFactoryV10.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompFrameFactoryV10.java
@@ -38,18 +38,18 @@ import java.util.StringTokenizer;
 public class StompFrameFactoryV10 implements StompFrameFactory {
 
    @Override
-   public ClientStompFrame createFrame(String data) {
+   public ClientStompFrame createFrame(final String data) {
       //split the string at "\n\n"
       String[] dataFields = data.split("\n\n");
 
       StringTokenizer tokenizer = new StringTokenizer(dataFields[0], "\n");
 
       String command = tokenizer.nextToken();
-      ClientStompFrame frame = new ClientStompFrameV10(command);
+      ClientStompFrame frame = newFrame(command);
 
       while (tokenizer.hasMoreTokens()) {
          String header = tokenizer.nextToken();
-         String[] fields = header.split(":");
+         String[] fields = handleHeaders(header);
          frame.addHeader(fields[0], fields[1]);
       }
 
@@ -61,6 +61,11 @@ public class StompFrameFactoryV10 implements StompFrameFactory {
    }
 
    @Override
+   public String[] handleHeaders(String header) {
+      return header.split(":");
+   }
+
+   @Override
    public ClientStompFrame newFrame(String command) {
       return new ClientStompFrameV10(command);
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/84df373a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompFrameFactoryV11.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompFrameFactoryV11.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompFrameFactoryV11.java
index 4de0d7d..807b3f0 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompFrameFactoryV11.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompFrameFactoryV11.java
@@ -36,32 +36,10 @@ import java.util.StringTokenizer;
  * 13. RECEIPT
  * 14. ERROR
  */
-public class StompFrameFactoryV11 implements StompFrameFactory {
+public class StompFrameFactoryV11 extends StompFrameFactoryV10 {
 
    @Override
-   public ClientStompFrame createFrame(final String data) {
-      //split the string at "\n\n"
-      String[] dataFields = data.split("\n\n");
-
-      StringTokenizer tokenizer = new StringTokenizer(dataFields[0], "\n");
-
-      String command = tokenizer.nextToken();
-      ClientStompFrame frame = new ClientStompFrameV11(command);
-
-      while (tokenizer.hasMoreTokens()) {
-         String header = tokenizer.nextToken();
-         String[] fields = splitAndDecodeHeader(header);
-         frame.addHeader(fields[0], fields[1]);
-      }
-
-      //body (without null byte)
-      if (dataFields.length == 2) {
-         frame.setBody(dataFields[1]);
-      }
-      return frame;
-   }
-
-   private String[] splitAndDecodeHeader(String header) {
+   public String[] handleHeaders(String header) {
       // split the header into the key and value at the ":" since there shouldn't be any unescaped colons in the header
       // except for the one separating the key and value
       String[] result = header.split(":");

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/84df373a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompFrameFactoryV12.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompFrameFactoryV12.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompFrameFactoryV12.java
index 5223b4e..dbd32e0 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompFrameFactoryV12.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompFrameFactoryV12.java
@@ -18,42 +18,10 @@ package org.apache.activemq.artemis.tests.integration.stomp.util;
 
 import java.util.StringTokenizer;
 
-public class StompFrameFactoryV12 implements StompFrameFactory {
+public class StompFrameFactoryV12 extends StompFrameFactoryV11 {
 
    @Override
-   public ClientStompFrame createFrame(String data) {
-      //split the string at "\n\n"
-      String[] dataFields = data.split("\n\n");
-
-      StringTokenizer tokenizer = new StringTokenizer(dataFields[0], "\n");
-
-      String command = tokenizer.nextToken();
-      ClientStompFrame frame = new ClientStompFrameV12(command);
-
-      while (tokenizer.hasMoreTokens()) {
-         String header = tokenizer.nextToken();
-         String[] fields = splitAndDecodeHeader(header);
-         frame.addHeader(fields[0], fields[1]);
-      }
-
-      //body (without null byte)
-      if (dataFields.length == 2) {
-         frame.setBody(dataFields[1]);
-      }
-      return frame;
-   }
-
-   public void printByteHeader(String headers) {
-      StringBuffer buffer = new StringBuffer();
-
-      for (int i = 0; i < headers.length(); i++) {
-         char c = headers.charAt(i);
-         buffer.append((byte) c + " ");
-      }
-      System.out.println("header in byte : " + buffer.toString());
-   }
-
-   private String[] splitAndDecodeHeader(String header) {
+   public String[] handleHeaders(String header) {
       // split the header into the key and value at the ":" since there shouldn't be any unescaped colons in the header
       // except for the one separating the key and value
       String[] result = header.split(":");

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/84df373a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/v11/ExtraStompTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/v11/ExtraStompTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/v11/ExtraStompTest.java
index a5d3068..5414a9f 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/v11/ExtraStompTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/v11/ExtraStompTest.java
@@ -18,6 +18,8 @@ package org.apache.activemq.artemis.tests.integration.stomp.v11;
 
 import java.nio.charset.StandardCharsets;
 
+import org.apache.activemq.artemis.core.protocol.stomp.Stomp;
+import org.apache.activemq.artemis.tests.integration.stomp.StompTestBase;
 import org.apache.activemq.artemis.tests.integration.stomp.util.ClientStompFrame;
 import org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConnection;
 import org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConnectionFactory;
@@ -28,15 +30,18 @@ import org.junit.Test;
 /*
  * Some Stomp tests against server with persistence enabled are put here.
  */
-public class ExtraStompTest extends StompV11TestBase {
+public class ExtraStompTest extends StompTestBase {
 
    private StompClientConnection connV10;
    private StompClientConnection connV11;
 
+   public boolean isPersistenceEnabled() {
+      return true;
+   }
+
    @Override
    @Before
    public void setUp() throws Exception {
-      persistenceEnabled = true;
       super.setUp();
       connV10 = StompClientConnectionFactory.createClientConnection("1.0", hostname, port);
       connV10.connect(defUser, defPass);
@@ -57,331 +62,142 @@ public class ExtraStompTest extends StompV11TestBase {
 
    @Test
    public void testSendAndReceive10() throws Exception {
-      String msg1 = "Hello World 1!";
-      String msg2 = "Hello World 2!";
-
-      ClientStompFrame frame = connV10.createFrame("SEND");
-      frame.addHeader("destination", getQueuePrefix() + getQueueName());
-      frame.addHeader("content-length", String.valueOf(msg1.getBytes(StandardCharsets.UTF_8).length));
-      frame.addHeader("persistent", "true");
-      frame.setBody(msg1);
-
-      connV10.sendFrame(frame);
-
-      ClientStompFrame frame2 = connV10.createFrame("SEND");
-      frame2.addHeader("destination", getQueuePrefix() + getQueueName());
-      frame2.addHeader("content-length", String.valueOf(msg2.getBytes(StandardCharsets.UTF_8).length));
-      frame2.addHeader("persistent", "true");
-      frame2.setBody(msg2);
-
-      connV10.sendFrame(frame2);
-
-      ClientStompFrame subFrame = connV10.createFrame("SUBSCRIBE");
-      subFrame.addHeader("id", "a-sub");
-      subFrame.addHeader("destination", getQueuePrefix() + getQueueName());
-      subFrame.addHeader("ack", "auto");
-
-      connV10.sendFrame(subFrame);
-
-      frame = connV10.receiveFrame();
-
-      System.out.println("received " + frame);
-
-      assertEquals("MESSAGE", frame.getCommand());
-
-      assertEquals("a-sub", frame.getHeader("subscription"));
-
-      assertNotNull(frame.getHeader("message-id"));
-
-      assertEquals(getQueuePrefix() + getQueueName(), frame.getHeader("destination"));
-
-      assertEquals(msg1, frame.getBody());
-
-      frame = connV10.receiveFrame();
-
-      System.out.println("received " + frame);
-
-      assertEquals("MESSAGE", frame.getCommand());
-
-      assertEquals("a-sub", frame.getHeader("subscription"));
-
-      assertNotNull(frame.getHeader("message-id"));
-
-      assertEquals(getQueuePrefix() + getQueueName(), frame.getHeader("destination"));
-
-      assertEquals(msg2, frame.getBody());
-
-      //unsub
-      ClientStompFrame unsubFrame = connV10.createFrame("UNSUBSCRIBE");
-      unsubFrame.addHeader("id", "a-sub");
-      connV10.sendFrame(unsubFrame);
-
+      testSendAndReceive(connV10);
    }
 
    @Test
    public void testSendAndReceive11() throws Exception {
+      testSendAndReceive(connV11);
+   }
+
+   public void testSendAndReceive(StompClientConnection conn) throws Exception {
       String msg1 = "Hello World 1!";
       String msg2 = "Hello World 2!";
 
-      ClientStompFrame frame = connV11.createFrame("SEND");
-      frame.addHeader("destination", getQueuePrefix() + getQueueName());
-      frame.addHeader("content-length", String.valueOf(msg1.getBytes(StandardCharsets.UTF_8).length));
-      frame.addHeader("persistent", "true");
+      ClientStompFrame frame = conn.createFrame(Stomp.Commands.SEND);
+      frame.addHeader(Stomp.Headers.Subscribe.DESTINATION, getQueuePrefix() + getQueueName());
+      frame.addHeader(Stomp.Headers.CONTENT_LENGTH, String.valueOf(msg1.getBytes(StandardCharsets.UTF_8).length));
+      frame.addHeader(Stomp.Headers.Send.PERSISTENT, Boolean.TRUE.toString());
       frame.setBody(msg1);
 
-      connV11.sendFrame(frame);
+      conn.sendFrame(frame);
 
-      ClientStompFrame frame2 = connV11.createFrame("SEND");
-      frame2.addHeader("destination", getQueuePrefix() + getQueueName());
-      frame2.addHeader("content-length", String.valueOf(msg2.getBytes(StandardCharsets.UTF_8).length));
-      frame2.addHeader("persistent", "true");
+      ClientStompFrame frame2 = conn.createFrame(Stomp.Commands.SEND);
+      frame2.addHeader(Stomp.Headers.Subscribe.DESTINATION, getQueuePrefix() + getQueueName());
+      frame2.addHeader(Stomp.Headers.CONTENT_LENGTH, String.valueOf(msg2.getBytes(StandardCharsets.UTF_8).length));
+      frame2.addHeader(Stomp.Headers.Send.PERSISTENT, Boolean.TRUE.toString());
       frame2.setBody(msg2);
 
-      connV11.sendFrame(frame2);
-
-      ClientStompFrame subFrame = connV11.createFrame("SUBSCRIBE");
-      subFrame.addHeader("id", "a-sub");
-      subFrame.addHeader("destination", getQueuePrefix() + getQueueName());
-      subFrame.addHeader("ack", "auto");
-
-      connV11.sendFrame(subFrame);
-
-      frame = connV11.receiveFrame();
-
-      System.out.println("received " + frame);
-
-      assertEquals("MESSAGE", frame.getCommand());
+      conn.sendFrame(frame2);
 
-      assertEquals("a-sub", frame.getHeader("subscription"));
+      subscribe(conn, "a-sub");
 
-      assertNotNull(frame.getHeader("message-id"));
-
-      assertEquals(getQueuePrefix() + getQueueName(), frame.getHeader("destination"));
+      frame = conn.receiveFrame();
 
+      assertEquals(Stomp.Responses.MESSAGE, frame.getCommand());
+      assertEquals("a-sub", frame.getHeader(Stomp.Headers.Message.SUBSCRIPTION));
+      assertNotNull(frame.getHeader(Stomp.Headers.Message.MESSAGE_ID));
+      assertEquals(getQueuePrefix() + getQueueName(), frame.getHeader(Stomp.Headers.Subscribe.DESTINATION));
       assertEquals(msg1, frame.getBody());
 
-      frame = connV11.receiveFrame();
-
-      System.out.println("received " + frame);
-
-      assertEquals("MESSAGE", frame.getCommand());
-
-      assertEquals("a-sub", frame.getHeader("subscription"));
-
-      assertNotNull(frame.getHeader("message-id"));
-
-      assertEquals(getQueuePrefix() + getQueueName(), frame.getHeader("destination"));
+      frame = conn.receiveFrame();
 
+      assertEquals(Stomp.Responses.MESSAGE, frame.getCommand());
+      assertEquals("a-sub", frame.getHeader(Stomp.Headers.Message.SUBSCRIPTION));
+      assertNotNull(frame.getHeader(Stomp.Headers.Message.MESSAGE_ID));
+      assertEquals(getQueuePrefix() + getQueueName(), frame.getHeader(Stomp.Headers.Subscribe.DESTINATION));
       assertEquals(msg2, frame.getBody());
 
-      //unsub
-      ClientStompFrame unsubFrame = connV11.createFrame("UNSUBSCRIBE");
-      unsubFrame.addHeader("id", "a-sub");
-      connV11.sendFrame(unsubFrame);
+      unsubscribe(conn, "a-sub");
    }
 
    @Test
    public void testNoGarbageAfterPersistentMessageV10() throws Exception {
-      ClientStompFrame subFrame = connV10.createFrame("SUBSCRIBE");
-      subFrame.addHeader("id", "a-sub");
-      subFrame.addHeader("destination", getQueuePrefix() + getQueueName());
-      subFrame.addHeader("ack", "auto");
-
-      connV10.sendFrame(subFrame);
+      testNoGarbageAfterPersistentMessage(connV10);
+   }
 
-      ClientStompFrame frame = connV10.createFrame("SEND");
-      frame.addHeader("destination", getQueuePrefix() + getQueueName());
-      frame.addHeader("content-length", "11");
-      frame.addHeader("persistent", "true");
+   @Test
+   public void testNoGarbageAfterPersistentMessageV11() throws Exception {
+      testNoGarbageAfterPersistentMessage(connV11);
+   }   
+   
+   public void testNoGarbageAfterPersistentMessage(StompClientConnection conn) throws Exception {
+      subscribe(conn, "a-sub");
+
+      ClientStompFrame frame = conn.createFrame(Stomp.Commands.SEND);
+      frame.addHeader(Stomp.Headers.Subscribe.DESTINATION, getQueuePrefix() + getQueueName());
+      frame.addHeader(Stomp.Headers.CONTENT_LENGTH, "11");
+      frame.addHeader(Stomp.Headers.Send.PERSISTENT, Boolean.TRUE.toString());
       frame.setBody("Hello World");
 
-      connV10.sendFrame(frame);
+      conn.sendFrame(frame);
 
-      frame = connV10.createFrame("SEND");
-      frame.addHeader("destination", getQueuePrefix() + getQueueName());
-      frame.addHeader("content-length", "11");
-      frame.addHeader("persistent", "true");
+      frame = conn.createFrame(Stomp.Commands.SEND);
+      frame.addHeader(Stomp.Headers.Subscribe.DESTINATION, getQueuePrefix() + getQueueName());
+      frame.addHeader(Stomp.Headers.CONTENT_LENGTH, "11");
+      frame.addHeader(Stomp.Headers.Send.PERSISTENT, Boolean.TRUE.toString());
       frame.setBody("Hello World");
 
-      connV10.sendFrame(frame);
-
-      frame = connV10.receiveFrame(10000);
+      conn.sendFrame(frame);
 
-      System.out.println("received: " + frame);
+      frame = conn.receiveFrame(10000);
 
       assertEquals("Hello World", frame.getBody());
 
       //if activemq sends trailing garbage bytes, the second message
       //will not be normal
-      frame = connV10.receiveFrame(10000);
-
-      System.out.println("received: " + frame);
+      frame = conn.receiveFrame(10000);
 
       assertEquals("Hello World", frame.getBody());
 
-      //unsub
-      ClientStompFrame unsubFrame = connV10.createFrame("UNSUBSCRIBE");
-      unsubFrame.addHeader("id", "a-sub");
-      connV10.sendFrame(unsubFrame);
-
+      unsubscribe(conn, "a-sub");
    }
 
    @Test
    public void testNoGarbageOnPersistentRedeliveryV10() throws Exception {
-
-      ClientStompFrame frame = connV10.createFrame("SEND");
-      frame.addHeader("destination", getQueuePrefix() + getQueueName());
-      frame.addHeader("content-length", "11");
-      frame.addHeader("persistent", "true");
-      frame.setBody("Hello World");
-
-      connV10.sendFrame(frame);
-
-      frame = connV10.createFrame("SEND");
-      frame.addHeader("destination", getQueuePrefix() + getQueueName());
-      frame.addHeader("content-length", "11");
-      frame.addHeader("persistent", "true");
-      frame.setBody("Hello World");
-
-      connV10.sendFrame(frame);
-
-      ClientStompFrame subFrame = connV10.createFrame("SUBSCRIBE");
-      subFrame.addHeader("id", "a-sub");
-      subFrame.addHeader("destination", getQueuePrefix() + getQueueName());
-      subFrame.addHeader("ack", "client");
-
-      connV10.sendFrame(subFrame);
-
-      // receive but don't ack
-      frame = connV10.receiveFrame(10000);
-      frame = connV10.receiveFrame(10000);
-
-      System.out.println("received: " + frame);
-
-      //unsub
-      ClientStompFrame unsubFrame = connV10.createFrame("UNSUBSCRIBE");
-      unsubFrame.addHeader("id", "a-sub");
-      connV10.sendFrame(unsubFrame);
-
-      subFrame = connV10.createFrame("SUBSCRIBE");
-      subFrame.addHeader("id", "a-sub");
-      subFrame.addHeader("destination", getQueuePrefix() + getQueueName());
-      subFrame.addHeader("ack", "auto");
-
-      connV10.sendFrame(subFrame);
-
-      frame = connV10.receiveFrame(10000);
-      frame = connV10.receiveFrame(10000);
-
-      //second receive will get problem if trailing bytes
-      assertEquals("Hello World", frame.getBody());
-
-      System.out.println("received again: " + frame);
-
-      //unsub
-      unsubFrame = connV10.createFrame("UNSUBSCRIBE");
-      unsubFrame.addHeader("id", "a-sub");
-      connV10.sendFrame(unsubFrame);
+      testNoGarbageOnPersistentRedelivery(connV10);
    }
 
    @Test
-   public void testNoGarbageAfterPersistentMessageV11() throws Exception {
-      ClientStompFrame subFrame = connV11.createFrame("SUBSCRIBE");
-      subFrame.addHeader("id", "a-sub");
-      subFrame.addHeader("destination", getQueuePrefix() + getQueueName());
-      subFrame.addHeader("ack", "auto");
-
-      connV11.sendFrame(subFrame);
-      ClientStompFrame frame = connV11.createFrame("SEND");
-      frame.addHeader("destination", getQueuePrefix() + getQueueName());
-      frame.addHeader("content-length", "11");
-      frame.addHeader("persistent", "true");
-      frame.setBody("Hello World");
-
-      connV11.sendFrame(frame);
-
-      frame = connV11.createFrame("SEND");
-      frame.addHeader("destination", getQueuePrefix() + getQueueName());
-      frame.addHeader("content-length", "11");
-      frame.addHeader("persistent", "true");
-      frame.setBody("Hello World");
-
-      connV11.sendFrame(frame);
-      frame = connV11.receiveFrame(10000);
-
-      System.out.println("received: " + frame);
-
-      assertEquals("Hello World", frame.getBody());
-
-      //if activemq sends trailing garbage bytes, the second message
-      //will not be normal
-      frame = connV11.receiveFrame(10000);
-
-      System.out.println("received: " + frame);
-
-      assertEquals("Hello World", frame.getBody());
-
-      //unsub
-      ClientStompFrame unsubFrame = connV11.createFrame("UNSUBSCRIBE");
-      unsubFrame.addHeader("id", "a-sub");
-      connV11.sendFrame(unsubFrame);
+   public void testNoGarbageOnPersistentRedeliveryV11() throws Exception {
+      testNoGarbageOnPersistentRedelivery(connV11);
    }
 
-   @Test
-   public void testNoGarbageOnPersistentRedeliveryV11() throws Exception {
-      ClientStompFrame frame = connV11.createFrame("SEND");
-      frame.addHeader("destination", getQueuePrefix() + getQueueName());
-      frame.addHeader("content-length", "11");
-      frame.addHeader("persistent", "true");
+   public void testNoGarbageOnPersistentRedelivery(StompClientConnection conn) throws Exception {
+      ClientStompFrame frame = conn.createFrame(Stomp.Commands.SEND);
+      frame.addHeader(Stomp.Headers.Subscribe.DESTINATION, getQueuePrefix() + getQueueName());
+      frame.addHeader(Stomp.Headers.CONTENT_LENGTH, "11");
+      frame.addHeader(Stomp.Headers.Send.PERSISTENT, Boolean.TRUE.toString());
       frame.setBody("Hello World");
 
-      connV11.sendFrame(frame);
+      conn.sendFrame(frame);
 
-      frame = connV11.createFrame("SEND");
-      frame.addHeader("destination", getQueuePrefix() + getQueueName());
-      frame.addHeader("content-length", "11");
-      frame.addHeader("persistent", "true");
+      frame = conn.createFrame(Stomp.Commands.SEND);
+      frame.addHeader(Stomp.Headers.Subscribe.DESTINATION, getQueuePrefix() + getQueueName());
+      frame.addHeader(Stomp.Headers.CONTENT_LENGTH, "11");
+      frame.addHeader(Stomp.Headers.Send.PERSISTENT, Boolean.TRUE.toString());
       frame.setBody("Hello World");
 
-      connV11.sendFrame(frame);
-
-      ClientStompFrame subFrame = connV11.createFrame("SUBSCRIBE");
-      subFrame.addHeader("id", "a-sub");
-      subFrame.addHeader("destination", getQueuePrefix() + getQueueName());
-      subFrame.addHeader("ack", "client");
+      conn.sendFrame(frame);
 
-      connV11.sendFrame(subFrame);
+      subscribe(conn, "a-sub", Stomp.Headers.Subscribe.AckModeValues.CLIENT);
 
       // receive but don't ack
-      frame = connV11.receiveFrame(10000);
-      frame = connV11.receiveFrame(10000);
-
-      System.out.println("received: " + frame);
+      frame = conn.receiveFrame(10000);
+      frame = conn.receiveFrame(10000);
 
-      //unsub
-      ClientStompFrame unsubFrame = connV11.createFrame("UNSUBSCRIBE");
-      unsubFrame.addHeader("id", "a-sub");
-      connV11.sendFrame(unsubFrame);
-
-      subFrame = connV11.createFrame("SUBSCRIBE");
-      subFrame.addHeader("id", "a-sub");
-      subFrame.addHeader("destination", getQueuePrefix() + getQueueName());
-      subFrame.addHeader("ack", "auto");
+      unsubscribe(conn, "a-sub");
 
-      connV11.sendFrame(subFrame);
+      subscribe(conn, "a-sub");
 
-      frame = connV11.receiveFrame(10000);
-      frame = connV11.receiveFrame(10000);
+      frame = conn.receiveFrame(10000);
+      frame = conn.receiveFrame(10000);
 
       //second receive will get problem if trailing bytes
       assertEquals("Hello World", frame.getBody());
 
-      System.out.println("received again: " + frame);
-
       //unsub
-      unsubFrame = connV11.createFrame("UNSUBSCRIBE");
-      unsubFrame.addHeader("id", "a-sub");
-      connV11.sendFrame(unsubFrame);
+      unsubscribe(conn, "a-sub");
    }
 
 }