You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by jb...@apache.org on 2016/11/11 19:23:38 UTC

[08/11] activemq-artemis git commit: Stomp refactor + track autocreation for addresses

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/84df373a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java
index 159a285..8822015 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java
@@ -513,7 +513,7 @@ public class HangConsumerTest extends ActiveMQTestBase {
       }
 
       /* (non-Javadoc)
-       * @see SessionCallback#sendMessage(org.apache.activemq.artemis.core.server.ServerMessage, long, int)
+       * @see SessionCallback#sendJmsMessage(org.apache.activemq.artemis.core.server.ServerMessage, long, int)
        */
       @Override
       public int sendMessage(MessageReference ref, ServerMessage message, ServerConsumer consumer, int deliveryCount) {
@@ -592,7 +592,7 @@ public class HangConsumerTest extends ActiveMQTestBase {
                                                         SessionCallback callback,
                                                         OperationContext context,
                                                         boolean autoCreateQueue) throws Exception {
-         return new ServerSessionImpl(name, username, password, validatedUser, minLargeMessageSize, autoCommitSends, autoCommitAcks, preAcknowledge, getConfiguration().isPersistDeliveryCountBeforeDelivery(), xa, connection, getStorageManager(), getPostOffice(), getResourceManager(), getSecurityStore(), getManagementService(), this, getConfiguration().getManagementAddress(), defaultAddress == null ? null : new SimpleString(defaultAddress), new MyCallback(callback), context, null, getPagingManager());
+         return new ServerSessionImpl(name, username, password, validatedUser, minLargeMessageSize, autoCommitSends, autoCommitAcks, preAcknowledge, getConfiguration().isPersistDeliveryCountBeforeDelivery(), xa, connection, getStorageManager(), getPostOffice(), getResourceManager(), getSecurityStore(), getManagementService(), this, getConfiguration().getManagementAddress(), defaultAddress == null ? null : new SimpleString(defaultAddress), new MyCallback(callback), context, getPagingManager());
       }
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/84df373a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/ConcurrentStompTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/ConcurrentStompTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/ConcurrentStompTest.java
deleted file mode 100644
index 6917cfb..0000000
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/ConcurrentStompTest.java
+++ /dev/null
@@ -1,136 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.tests.integration.stomp;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.net.Socket;
-import java.nio.charset.StandardCharsets;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.activemq.artemis.core.protocol.stomp.Stomp;
-import org.junit.Assert;
-import org.junit.Test;
-
-public class ConcurrentStompTest extends StompTestBase {
-
-   private Socket stompSocket_2;
-
-   private ByteArrayOutputStream inputBuffer_2;
-
-   /**
-    * Send messages on 1 socket and receives them concurrently on another socket.
-    */
-   @Test
-   public void testSendManyMessages() throws Exception {
-      try {
-         String connect = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
-
-         sendFrame(connect);
-         String connected = receiveFrame(10000);
-         Assert.assertTrue(connected.startsWith("CONNECTED"));
-
-         stompSocket_2 = createSocket();
-         inputBuffer_2 = new ByteArrayOutputStream();
-
-         sendFrame(stompSocket_2, connect);
-         connected = receiveFrame(stompSocket_2, inputBuffer_2, 10000);
-         Assert.assertTrue(connected.startsWith("CONNECTED"));
-
-         final int count = 1000;
-         final CountDownLatch latch = new CountDownLatch(count);
-
-         String subscribe = "SUBSCRIBE\n" +
-            "destination:" + getQueuePrefix() + getQueueName() + "\n" +
-            "ack:auto\n\n" +
-            Stomp.NULL;
-         sendFrame(stompSocket_2, subscribe);
-         Thread.sleep(2000);
-
-         new Thread() {
-            @Override
-            public void run() {
-               int i = 0;
-               while (true) {
-                  try {
-                     String frame = receiveFrame(stompSocket_2, inputBuffer_2, 10000);
-                     Assert.assertTrue(frame.startsWith("MESSAGE"));
-                     Assert.assertTrue(frame.indexOf("destination:") > 0);
-                     System.out.println("<<< " + i++);
-                     latch.countDown();
-                  } catch (Exception e) {
-                     break;
-                  }
-               }
-            }
-         }.start();
-
-         String send = "SEND\n" + "destination:" + getQueuePrefix() + getQueueName() + "\n";
-         for (int i = 1; i <= count; i++) {
-            // Thread.sleep(1);
-            System.out.println(">>> " + i);
-            sendFrame(send + "count:" + i + "\n\n" + Stomp.NULL);
-         }
-
-         assertTrue(latch.await(60, TimeUnit.SECONDS));
-
-      } finally {
-         stompSocket_2.close();
-         inputBuffer_2.close();
-      }
-
-   }
-
-   // Implementation methods
-   // -------------------------------------------------------------------------
-   public void sendFrame(Socket socket, String data) throws Exception {
-      byte[] bytes = data.getBytes(StandardCharsets.UTF_8);
-      OutputStream outputStream = socket.getOutputStream();
-      for (byte b : bytes) {
-         outputStream.write(b);
-      }
-      outputStream.flush();
-   }
-
-   public String receiveFrame(Socket socket, ByteArrayOutputStream input, long timeOut) throws Exception {
-      socket.setSoTimeout((int) timeOut);
-      InputStream is = socket.getInputStream();
-      int c = 0;
-      for (;;) {
-         c = is.read();
-         if (c < 0) {
-            throw new IOException("socket closed.");
-         } else if (c == 0) {
-            c = is.read();
-            if (c != '\n') {
-               byte[] ba = input.toByteArray();
-               System.out.println(new String(ba, StandardCharsets.UTF_8));
-            }
-            Assert.assertEquals("Expecting stomp frame to terminate with \0\n", c, '\n');
-            byte[] ba = input.toByteArray();
-            input.reset();
-            return new String(ba, StandardCharsets.UTF_8);
-         } else {
-            input.write(c);
-         }
-      }
-   }
-
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/84df373a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/ExtraStompTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/ExtraStompTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/ExtraStompTest.java
deleted file mode 100644
index a0dcdbf..0000000
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/ExtraStompTest.java
+++ /dev/null
@@ -1,848 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.tests.integration.stomp;
-
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.QueueBrowser;
-import javax.jms.TextMessage;
-import java.util.ArrayList;
-import java.util.Enumeration;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.activemq.artemis.api.core.Interceptor;
-import org.apache.activemq.artemis.api.core.TransportConfiguration;
-import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
-import org.apache.activemq.artemis.core.config.Configuration;
-import org.apache.activemq.artemis.core.protocol.core.Packet;
-import org.apache.activemq.artemis.core.protocol.stomp.Stomp;
-import org.apache.activemq.artemis.core.protocol.stomp.StompFrame;
-import org.apache.activemq.artemis.core.protocol.stomp.StompFrameInterceptor;
-import org.apache.activemq.artemis.core.protocol.stomp.StompProtocolManagerFactory;
-import org.apache.activemq.artemis.core.registry.JndiBindingRegistry;
-import org.apache.activemq.artemis.core.remoting.impl.invm.InVMAcceptorFactory;
-import org.apache.activemq.artemis.core.remoting.impl.netty.NettyAcceptorFactory;
-import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
-import org.apache.activemq.artemis.core.server.ActiveMQServer;
-import org.apache.activemq.artemis.core.server.ActiveMQServers;
-import org.apache.activemq.artemis.jms.server.JMSServerManager;
-import org.apache.activemq.artemis.jms.server.config.JMSConfiguration;
-import org.apache.activemq.artemis.jms.server.config.impl.JMSConfigurationImpl;
-import org.apache.activemq.artemis.jms.server.config.impl.JMSQueueConfigurationImpl;
-import org.apache.activemq.artemis.jms.server.config.impl.TopicConfigurationImpl;
-import org.apache.activemq.artemis.jms.server.impl.JMSServerManagerImpl;
-import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
-import org.apache.activemq.artemis.tests.integration.largemessage.LargeMessageTestBase;
-import org.apache.activemq.artemis.tests.integration.stomp.util.ClientStompFrame;
-import org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConnection;
-import org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConnectionFactory;
-import org.apache.activemq.artemis.tests.unit.util.InVMNamingContext;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-public class ExtraStompTest extends StompTestBase {
-
-   @Override
-   @Before
-   public void setUp() throws Exception {
-      autoCreateServer = false;
-      super.setUp();
-   }
-
-   @Test
-   public void testConnectionTTL() throws Exception {
-      try {
-         server = createServerWithTTL("2000");
-         server.start();
-
-         setUpAfterServer();
-
-         String connect_frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n" + "request-id: 1\n" + "\n" + Stomp.NULL;
-         sendFrame(connect_frame);
-
-         String f = receiveFrame(10000);
-         Assert.assertTrue(f.startsWith("CONNECTED"));
-         Assert.assertTrue(f.indexOf("response-id:1") >= 0);
-
-         String frame = "SEND\n" + "destination:" + getQueuePrefix() + getQueueName() + "\n\n" + "Hello World 1" + Stomp.NULL;
-         sendFrame(frame);
-
-         assertChannelClosed();
-
-         MessageConsumer consumer = session.createConsumer(queue);
-
-         TextMessage message = (TextMessage) consumer.receiveNoWait();
-         Assert.assertNotNull(message);
-
-         message = (TextMessage) consumer.receiveNoWait();
-         Assert.assertNull(message);
-      } finally {
-         cleanUp();
-         server.stop();
-      }
-   }
-
-   @Test
-   public void testEnableMessageID() throws Exception {
-      enableMessageIDTest(true);
-   }
-
-   @Test
-   public void testDisableMessageID() throws Exception {
-      enableMessageIDTest(false);
-   }
-
-   @Test
-   public void testDefaultEnableMessageID() throws Exception {
-      enableMessageIDTest(null);
-   }
-
-   //stomp sender -> large -> stomp receiver
-   @Test
-   public void testSendReceiveLargePersistentMessages() throws Exception {
-      try {
-         server = createPersistentServerWithStompMinLargeSize(2048);
-         server.start();
-
-         setUpAfterServer();
-
-         String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
-         sendFrame(frame);
-         frame = receiveFrame(10000);
-
-         Assert.assertTrue(frame.startsWith("CONNECTED"));
-         int count = 10;
-         int szBody = 1024 * 1024;
-         char[] contents = new char[szBody];
-         for (int i = 0; i < szBody; i++) {
-            contents[i] = 'A';
-         }
-         String body = new String(contents);
-
-         frame = "SEND\n" + "destination:" + getQueuePrefix() + getQueueName() + "\n" + "persistent:true\n" + "\n\n" + body + Stomp.NULL;
-
-         for (int i = 0; i < count; i++) {
-            sendFrame(frame);
-         }
-
-         frame = "SUBSCRIBE\n" + "destination:" + getQueuePrefix() + getQueueName() + "\n" + "ack:auto\n\nfff" + Stomp.NULL;
-         sendFrame(frame);
-
-         for (int i = 0; i < count; i++) {
-            frame = receiveFrame(60000);
-            Assert.assertNotNull(frame);
-            System.out.println("part of frame: " + frame.substring(0, 200));
-            Assert.assertTrue(frame.startsWith("MESSAGE"));
-            Assert.assertTrue(frame.indexOf("destination:") > 0);
-            int index = frame.indexOf("AAAA");
-            assertEquals(szBody, (frame.length() - index));
-         }
-
-         // remove suscription
-         frame = "UNSUBSCRIBE\n" + "destination:" +
-            getQueuePrefix() +
-            getQueueName() +
-            "\n" +
-            "receipt:567\n" +
-            "\n\n" +
-            Stomp.NULL;
-         sendFrame(frame);
-         waitForReceipt();
-
-         frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
-         sendFrame(frame);
-      } catch (Exception ex) {
-         ex.printStackTrace();
-         throw ex;
-      } finally {
-         cleanUp();
-         server.stop();
-      }
-   }
-
-   //core sender -> large -> stomp receiver
-   @Test
-   public void testReceiveLargePersistentMessagesFromCore() throws Exception {
-      try {
-         server = createPersistentServerWithStompMinLargeSize(2048);
-         server.start();
-
-         setUpAfterServer();
-
-         int msgSize = 3 * ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE;
-         char[] contents = new char[msgSize];
-         for (int i = 0; i < msgSize; i++) {
-            contents[i] = 'B';
-         }
-         String msg = new String(contents);
-
-         int count = 10;
-         for (int i = 0; i < count; i++) {
-            this.sendMessage(msg);
-         }
-
-         String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
-         sendFrame(frame);
-         frame = receiveFrame(10000);
-
-         Assert.assertTrue(frame.startsWith("CONNECTED"));
-
-         frame = "SUBSCRIBE\n" + "destination:" + getQueuePrefix() + getQueueName() + "\n" + "ack:auto\n\nfff" + Stomp.NULL;
-         sendFrame(frame);
-
-         for (int i = 0; i < count; i++) {
-            frame = receiveFrame(60000);
-            Assert.assertNotNull(frame);
-            System.out.println("part of frame: " + frame.substring(0, 250));
-            Assert.assertTrue(frame.startsWith("MESSAGE"));
-            Assert.assertTrue(frame.indexOf("destination:") > 0);
-            int index = frame.indexOf("BBBB");
-            assertEquals(msgSize, (frame.length() - index));
-         }
-
-         // remove suscription
-         frame = "UNSUBSCRIBE\n" + "destination:" +
-            getQueuePrefix() +
-            getQueueName() +
-            "\n" +
-            "receipt:567\n" +
-            "\n\n" +
-            Stomp.NULL;
-         sendFrame(frame);
-         waitForReceipt();
-
-         frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
-         sendFrame(frame);
-      } catch (Exception ex) {
-         ex.printStackTrace();
-         throw ex;
-      } finally {
-         cleanUp();
-         server.stop();
-      }
-   }
-
-   //stomp v12 sender -> large -> stomp v12 receiver
-   @Test
-   public void testSendReceiveLargePersistentMessagesV12() throws Exception {
-      try {
-         server = createPersistentServerWithStompMinLargeSize(2048);
-         server.start();
-
-         setUpAfterServer();
-
-         StompClientConnection connV12 = StompClientConnectionFactory.createClientConnection("1.2", "localhost", port);
-         connV12.connect(defUser, defPass);
-
-         int count = 10;
-         int szBody = 1024 * 1024;
-         char[] contents = new char[szBody];
-         for (int i = 0; i < szBody; i++) {
-            contents[i] = 'A';
-         }
-         String body = new String(contents);
-
-         ClientStompFrame frame = connV12.createFrame("SEND");
-         frame.addHeader("destination", getQueuePrefix() + getQueueName());
-         frame.addHeader("persistent", "true");
-         frame.setBody(body);
-
-         for (int i = 0; i < count; i++) {
-            connV12.sendFrame(frame);
-         }
-
-         ClientStompFrame subFrame = connV12.createFrame("SUBSCRIBE");
-         subFrame.addHeader("id", "a-sub");
-         subFrame.addHeader("destination", getQueuePrefix() + getQueueName());
-         subFrame.addHeader("ack", "auto");
-
-         connV12.sendFrame(subFrame);
-
-         for (int i = 0; i < count; i++) {
-            ClientStompFrame receiveFrame = connV12.receiveFrame(30000);
-
-            Assert.assertNotNull(receiveFrame);
-            System.out.println("part of frame: " + receiveFrame.getBody().substring(0, 20));
-            Assert.assertTrue(receiveFrame.getCommand().equals("MESSAGE"));
-            Assert.assertEquals(receiveFrame.getHeader("destination"), getQueuePrefix() + getQueueName());
-            assertEquals(szBody, receiveFrame.getBody().length());
-         }
-
-         // remove susbcription
-         ClientStompFrame unsubFrame = connV12.createFrame("UNSUBSCRIBE");
-         unsubFrame.addHeader("id", "a-sub");
-         connV12.sendFrame(unsubFrame);
-
-         connV12.disconnect();
-      } catch (Exception ex) {
-         ex.printStackTrace();
-         throw ex;
-      } finally {
-         cleanUp();
-         server.stop();
-      }
-   }
-
-   //core sender -> large -> stomp v12 receiver
-   @Test
-   public void testReceiveLargePersistentMessagesFromCoreV12() throws Exception {
-      try {
-         server = createPersistentServerWithStompMinLargeSize(2048);
-         server.start();
-
-         setUpAfterServer();
-
-         int msgSize = 3 * ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE;
-         char[] contents = new char[msgSize];
-         for (int i = 0; i < msgSize; i++) {
-            contents[i] = 'B';
-         }
-         String msg = new String(contents);
-
-         int count = 10;
-         for (int i = 0; i < count; i++) {
-            this.sendMessage(msg);
-         }
-
-         StompClientConnection connV12 = StompClientConnectionFactory.createClientConnection("1.2", "localhost", port);
-         connV12.connect(defUser, defPass);
-
-         ClientStompFrame subFrame = connV12.createFrame("SUBSCRIBE");
-         subFrame.addHeader("id", "a-sub");
-         subFrame.addHeader("destination", getQueuePrefix() + getQueueName());
-         subFrame.addHeader("ack", "auto");
-
-         connV12.sendFrame(subFrame);
-
-         for (int i = 0; i < count; i++) {
-            ClientStompFrame receiveFrame = connV12.receiveFrame(30000);
-
-            Assert.assertNotNull(receiveFrame);
-            System.out.println("part of frame: " + receiveFrame.getBody().substring(0, 20));
-            Assert.assertTrue(receiveFrame.getCommand().equals("MESSAGE"));
-            Assert.assertEquals(receiveFrame.getHeader("destination"), getQueuePrefix() + getQueueName());
-            assertEquals(msgSize, receiveFrame.getBody().length());
-         }
-
-         // remove susbcription
-         ClientStompFrame unsubFrame = connV12.createFrame("UNSUBSCRIBE");
-         unsubFrame.addHeader("id", "a-sub");
-         connV12.sendFrame(unsubFrame);
-
-         connV12.disconnect();
-      } catch (Exception ex) {
-         ex.printStackTrace();
-         throw ex;
-      } finally {
-         cleanUp();
-         server.stop();
-      }
-   }
-
-   //core sender -> large (compressed regular) -> stomp v10 receiver
-   @Test
-   public void testReceiveLargeCompressedToRegularPersistentMessagesFromCore() throws Exception {
-      try {
-         server = createPersistentServerWithStompMinLargeSize(2048);
-         server.start();
-
-         setUpAfterServer(true);
-
-         LargeMessageTestBase.TestLargeMessageInputStream input = new LargeMessageTestBase.TestLargeMessageInputStream(ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE, true);
-         LargeMessageTestBase.adjustLargeCompression(true, input, ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE);
-
-         char[] contents = input.toArray();
-         String msg = new String(contents);
-
-         String leadingPart = msg.substring(0, 100);
-
-         int count = 10;
-         for (int i = 0; i < count; i++) {
-            this.sendMessage(msg);
-         }
-
-         String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
-         sendFrame(frame);
-         frame = receiveFrame(10000);
-
-         Assert.assertTrue(frame.startsWith("CONNECTED"));
-
-         frame = "SUBSCRIBE\n" + "destination:" + getQueuePrefix() + getQueueName() + "\n" + "ack:auto\n\nfff" + Stomp.NULL;
-         sendFrame(frame);
-
-         for (int i = 0; i < count; i++) {
-            frame = receiveFrame(60000);
-            Assert.assertNotNull(frame);
-            System.out.println("part of frame: " + frame.substring(0, 250));
-            Assert.assertTrue(frame.startsWith("MESSAGE"));
-            Assert.assertTrue(frame.indexOf("destination:") > 0);
-            int index = frame.indexOf(leadingPart);
-            assertEquals(msg.length(), (frame.length() - index));
-         }
-
-         // remove suscription
-         frame = "UNSUBSCRIBE\n" + "destination:" +
-            getQueuePrefix() +
-            getQueueName() +
-            "\n" +
-            "receipt:567\n" +
-            "\n\n" +
-            Stomp.NULL;
-         sendFrame(frame);
-         waitForReceipt();
-
-         frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
-         sendFrame(frame);
-      } catch (Exception ex) {
-         ex.printStackTrace();
-         throw ex;
-      } finally {
-         cleanUp();
-         server.stop();
-      }
-   }
-
-   //core sender -> large (compressed regular) -> stomp v12 receiver
-   @Test
-   public void testReceiveLargeCompressedToRegularPersistentMessagesFromCoreV12() throws Exception {
-      try {
-         server = createPersistentServerWithStompMinLargeSize(2048);
-         server.start();
-
-         setUpAfterServer(true);
-
-         LargeMessageTestBase.TestLargeMessageInputStream input = new LargeMessageTestBase.TestLargeMessageInputStream(ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE, true);
-         LargeMessageTestBase.adjustLargeCompression(true, input, ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE);
-
-         char[] contents = input.toArray();
-         String msg = new String(contents);
-
-         int count = 10;
-         for (int i = 0; i < count; i++) {
-            this.sendMessage(msg);
-         }
-
-         StompClientConnection connV12 = StompClientConnectionFactory.createClientConnection("1.2", "localhost", port);
-         connV12.connect(defUser, defPass);
-
-         ClientStompFrame subFrame = connV12.createFrame("SUBSCRIBE");
-         subFrame.addHeader("id", "a-sub");
-         subFrame.addHeader("destination", getQueuePrefix() + getQueueName());
-         subFrame.addHeader("ack", "auto");
-
-         connV12.sendFrame(subFrame);
-
-         for (int i = 0; i < count; i++) {
-            ClientStompFrame receiveFrame = connV12.receiveFrame(30000);
-
-            Assert.assertNotNull(receiveFrame);
-            System.out.println("part of frame: " + receiveFrame.getBody().substring(0, 20));
-            Assert.assertTrue(receiveFrame.getCommand().equals("MESSAGE"));
-            Assert.assertEquals(receiveFrame.getHeader("destination"), getQueuePrefix() + getQueueName());
-            assertEquals(contents.length, receiveFrame.getBody().length());
-         }
-
-         // remove susbcription
-         ClientStompFrame unsubFrame = connV12.createFrame("UNSUBSCRIBE");
-         unsubFrame.addHeader("id", "a-sub");
-         connV12.sendFrame(unsubFrame);
-
-         connV12.disconnect();
-      } catch (Exception ex) {
-         ex.printStackTrace();
-         throw ex;
-      } finally {
-         cleanUp();
-         server.stop();
-      }
-   }
-
-   //core sender -> large (compressed large) -> stomp v12 receiver
-   @Test
-   public void testReceiveLargeCompressedToLargePersistentMessagesFromCoreV12() throws Exception {
-      try {
-         server = createPersistentServerWithStompMinLargeSize(2048);
-         server.start();
-
-         setUpAfterServer(true);
-
-         LargeMessageTestBase.TestLargeMessageInputStream input = new LargeMessageTestBase.TestLargeMessageInputStream(ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE, true);
-         input.setSize(10 * ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE);
-         LargeMessageTestBase.adjustLargeCompression(false, input, 10 * ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE);
-
-         char[] contents = input.toArray();
-         String msg = new String(contents);
-
-         int count = 10;
-         for (int i = 0; i < count; i++) {
-            this.sendMessage(msg);
-         }
-
-         StompClientConnection connV12 = StompClientConnectionFactory.createClientConnection("1.2", "localhost", port);
-         connV12.connect(defUser, defPass);
-
-         ClientStompFrame subFrame = connV12.createFrame("SUBSCRIBE");
-         subFrame.addHeader("id", "a-sub");
-         subFrame.addHeader("destination", getQueuePrefix() + getQueueName());
-         subFrame.addHeader("ack", "auto");
-
-         connV12.sendFrame(subFrame);
-
-         for (int i = 0; i < count; i++) {
-            ClientStompFrame receiveFrame = connV12.receiveFrame(30000);
-
-            Assert.assertNotNull(receiveFrame);
-            System.out.println("part of frame: " + receiveFrame.getBody().substring(0, 20));
-            Assert.assertTrue(receiveFrame.getCommand().equals("MESSAGE"));
-            Assert.assertEquals(receiveFrame.getHeader("destination"), getQueuePrefix() + getQueueName());
-            assertEquals(contents.length, receiveFrame.getBody().length());
-         }
-
-         // remove susbcription
-         ClientStompFrame unsubFrame = connV12.createFrame("UNSUBSCRIBE");
-         unsubFrame.addHeader("id", "a-sub");
-         connV12.sendFrame(unsubFrame);
-
-         connV12.disconnect();
-      } catch (Exception ex) {
-         ex.printStackTrace();
-         throw ex;
-      } finally {
-         cleanUp();
-         server.stop();
-      }
-   }
-
-   //core sender -> large (compressed large) -> stomp v10 receiver
-   @Test
-   public void testReceiveLargeCompressedToLargePersistentMessagesFromCore() throws Exception {
-      try {
-         server = createPersistentServerWithStompMinLargeSize(2048);
-         server.start();
-
-         setUpAfterServer(true);
-
-         LargeMessageTestBase.TestLargeMessageInputStream input = new LargeMessageTestBase.TestLargeMessageInputStream(ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE, true);
-         input.setSize(10 * ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE);
-         LargeMessageTestBase.adjustLargeCompression(false, input, 10 * ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE);
-
-         char[] contents = input.toArray();
-         String msg = new String(contents);
-
-         String leadingPart = msg.substring(0, 100);
-
-         int count = 10;
-         for (int i = 0; i < count; i++) {
-            this.sendMessage(msg);
-         }
-
-         String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
-         sendFrame(frame);
-         frame = receiveFrame(10000);
-
-         Assert.assertTrue(frame.startsWith("CONNECTED"));
-
-         frame = "SUBSCRIBE\n" + "destination:" + getQueuePrefix() + getQueueName() + "\n" + "ack:auto\n\nfff" + Stomp.NULL;
-         sendFrame(frame);
-
-         for (int i = 0; i < count; i++) {
-            frame = receiveFrame(60000);
-            Assert.assertNotNull(frame);
-            System.out.println("part of frame: " + frame.substring(0, 250));
-            Assert.assertTrue(frame.startsWith("MESSAGE"));
-            Assert.assertTrue(frame.indexOf("destination:") > 0);
-            int index = frame.indexOf(leadingPart);
-            assertEquals(msg.length(), (frame.length() - index));
-         }
-
-         // remove suscription
-         frame = "UNSUBSCRIBE\n" + "destination:" +
-            getQueuePrefix() +
-            getQueueName() +
-            "\n" +
-            "receipt:567\n" +
-            "\n\n" +
-            Stomp.NULL;
-         sendFrame(frame);
-         waitForReceipt();
-
-         frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
-         sendFrame(frame);
-      } catch (Exception ex) {
-         ex.printStackTrace();
-         throw ex;
-      } finally {
-         cleanUp();
-         server.stop();
-      }
-   }
-
-   protected JMSServerManager createPersistentServerWithStompMinLargeSize(int sz) throws Exception {
-      Map<String, Object> params = new HashMap<>();
-      params.put(TransportConstants.PROTOCOLS_PROP_NAME, StompProtocolManagerFactory.STOMP_PROTOCOL_NAME);
-      params.put(TransportConstants.PORT_PROP_NAME, TransportConstants.DEFAULT_STOMP_PORT);
-      params.put(TransportConstants.STOMP_CONSUMERS_CREDIT, "-1");
-      params.put(TransportConstants.STOMP_MIN_LARGE_MESSAGE_SIZE, sz);
-      TransportConfiguration stompTransport = new TransportConfiguration(NettyAcceptorFactory.class.getName(), params);
-
-      Configuration config = createBasicConfig().setPersistenceEnabled(true).addAcceptorConfiguration(stompTransport).addAcceptorConfiguration(new TransportConfiguration(InVMAcceptorFactory.class.getName()));
-
-      ActiveMQServer activeMQServer = addServer(ActiveMQServers.newActiveMQServer(config, defUser, defPass));
-
-      JMSConfiguration jmsConfig = new JMSConfigurationImpl();
-      jmsConfig.getQueueConfigurations().add(new JMSQueueConfigurationImpl().setName(getQueueName()).setBindings(getQueueName()));
-      jmsConfig.getTopicConfigurations().add(new TopicConfigurationImpl().setName(getTopicName()).setBindings(getTopicName()));
-      server = new JMSServerManagerImpl(activeMQServer, jmsConfig);
-      server.setRegistry(new JndiBindingRegistry((new InVMNamingContext())));
-      return server;
-   }
-
-   private void enableMessageIDTest(Boolean enable) throws Exception {
-      try {
-         server = createServerWithExtraStompOptions(null, enable);
-         server.start();
-
-         setUpAfterServer();
-
-         String connect_frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n" + "request-id: 1\n" + "\n" + Stomp.NULL;
-         sendFrame(connect_frame);
-
-         String f = receiveFrame(10000);
-         Assert.assertTrue(f.startsWith("CONNECTED"));
-         Assert.assertTrue(f.indexOf("response-id:1") >= 0);
-
-         String frame = "SEND\n" + "destination:" + getQueuePrefix() + getQueueName() + "\n\n" + "Hello World 1" + Stomp.NULL;
-         sendFrame(frame);
-
-         frame = "SEND\n" + "destination:" + getQueuePrefix() + getQueueName() + "\n\n" + "Hello World 2" + Stomp.NULL;
-
-         sendFrame(frame);
-
-         QueueBrowser browser = session.createBrowser(queue);
-
-         Enumeration enu = browser.getEnumeration();
-
-         while (enu.hasMoreElements()) {
-            Message msg = (Message) enu.nextElement();
-            String msgId = msg.getStringProperty("amqMessageId");
-            if (enable != null && enable.booleanValue()) {
-               assertNotNull(msgId);
-               assertTrue(msgId.indexOf("STOMP") == 0);
-            } else {
-               assertNull(msgId);
-            }
-         }
-
-         browser.close();
-
-         MessageConsumer consumer = session.createConsumer(queue);
-
-         TextMessage message = (TextMessage) consumer.receive(1000);
-         Assert.assertNotNull(message);
-
-         message = (TextMessage) consumer.receive(1000);
-         Assert.assertNotNull(message);
-
-         message = (TextMessage) consumer.receive(2000);
-         Assert.assertNull(message);
-      } finally {
-         cleanUp();
-         server.stop();
-      }
-   }
-
-   protected JMSServerManager createServerWithTTL(String ttl) throws Exception {
-      return createServerWithExtraStompOptions(ttl, null);
-   }
-
-   protected JMSServerManager createServerWithExtraStompOptions(String ttl, Boolean enableMessageID) throws Exception {
-
-      Map<String, Object> params = new HashMap<>();
-      params.put(TransportConstants.PROTOCOLS_PROP_NAME, StompProtocolManagerFactory.STOMP_PROTOCOL_NAME);
-      params.put(TransportConstants.PORT_PROP_NAME, TransportConstants.DEFAULT_STOMP_PORT);
-      if (ttl != null) {
-         params.put(TransportConstants.CONNECTION_TTL, ttl);
-      }
-      if (enableMessageID != null) {
-         params.put(TransportConstants.STOMP_ENABLE_MESSAGE_ID, enableMessageID);
-      }
-      params.put(TransportConstants.STOMP_CONSUMERS_CREDIT, "-1");
-      TransportConfiguration stompTransport = new TransportConfiguration(NETTY_ACCEPTOR_FACTORY, params);
-
-      Configuration config = createBasicConfig().setPersistenceEnabled(false).addAcceptorConfiguration(stompTransport).addAcceptorConfiguration(new TransportConfiguration(INVM_ACCEPTOR_FACTORY));
-
-      ActiveMQServer activeMQServer = addServer(ActiveMQServers.newActiveMQServer(config, defUser, defPass));
-
-      JMSConfiguration jmsConfig = new JMSConfigurationImpl();
-      jmsConfig.getQueueConfigurations().add(new JMSQueueConfigurationImpl().setName(getQueueName()).setDurable(false).setBindings(getQueueName()));
-      jmsConfig.getTopicConfigurations().add(new TopicConfigurationImpl().setName(getTopicName()).setBindings(getTopicName()));
-      server = new JMSServerManagerImpl(activeMQServer, jmsConfig);
-      server.setRegistry(new JndiBindingRegistry(new InVMNamingContext()));
-      return server;
-   }
-
-   public static class MyCoreInterceptor implements Interceptor {
-
-      static List<Packet> incomingInterceptedFrames = new ArrayList<>();
-
-      @Override
-      public boolean intercept(Packet packet, RemotingConnection connection) {
-         incomingInterceptedFrames.add(packet);
-         return true;
-      }
-
-   }
-
-   public static class MyIncomingStompFrameInterceptor implements StompFrameInterceptor {
-
-      static List<StompFrame> incomingInterceptedFrames = new ArrayList<>();
-
-      @Override
-      public boolean intercept(StompFrame stompFrame, RemotingConnection connection) {
-         incomingInterceptedFrames.add(stompFrame);
-         stompFrame.addHeader("incomingInterceptedProp", "incomingInterceptedVal");
-         return true;
-      }
-   }
-
-   public static class MyOutgoingStompFrameInterceptor implements StompFrameInterceptor {
-
-      static List<StompFrame> outgoingInterceptedFrames = new ArrayList<>();
-
-      @Override
-      public boolean intercept(StompFrame stompFrame, RemotingConnection connection) {
-         outgoingInterceptedFrames.add(stompFrame);
-         stompFrame.addHeader("outgoingInterceptedProp", "outgoingInterceptedVal");
-         return true;
-      }
-   }
-
-   @Test
-   public void stompFrameInterceptor() throws Exception {
-      MyIncomingStompFrameInterceptor.incomingInterceptedFrames.clear();
-      MyOutgoingStompFrameInterceptor.outgoingInterceptedFrames.clear();
-      try {
-         List<String> incomingInterceptorList = new ArrayList<>();
-         incomingInterceptorList.add("org.apache.activemq.artemis.tests.integration.stomp.ExtraStompTest$MyIncomingStompFrameInterceptor");
-         incomingInterceptorList.add("org.apache.activemq.artemis.tests.integration.stomp.ExtraStompTest$MyCoreInterceptor");
-         List<String> outgoingInterceptorList = new ArrayList<>();
-         outgoingInterceptorList.add("org.apache.activemq.artemis.tests.integration.stomp.ExtraStompTest$MyOutgoingStompFrameInterceptor");
-
-         server = createServerWithStompInterceptor(incomingInterceptorList, outgoingInterceptorList);
-         server.start();
-
-         setUpAfterServer(); // This will make some calls through core
-
-         // So we clear them here
-         MyCoreInterceptor.incomingInterceptedFrames.clear();
-
-         String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
-         sendFrame(frame);
-
-         frame = receiveFrame(100000);
-
-         frame = "SUBSCRIBE\n" + "destination:" + getQueuePrefix() + getQueueName() + "\n" + "ack:auto\n\nfff" + Stomp.NULL;
-         sendFrame(frame);
-
-         assertEquals(0, MyCoreInterceptor.incomingInterceptedFrames.size());
-         sendMessage(getName());
-
-         // Something was supposed to be called on sendMessages
-         assertTrue("core interceptor is not working", MyCoreInterceptor.incomingInterceptedFrames.size() > 0);
-
-         receiveFrame(10000);
-
-         frame = "SEND\n" + "destination:" +
-            getQueuePrefix() +
-            getQueueName() +
-            "\n\n" +
-            "Hello World" +
-            Stomp.NULL;
-         sendFrame(frame);
-
-         receiveFrame(10000);
-
-         frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
-
-         sendFrame(frame);
-
-      } finally {
-         cleanUp();
-         server.stop();
-      }
-
-      List<String> incomingCommands = new ArrayList<>(4);
-      incomingCommands.add("CONNECT");
-      incomingCommands.add("SUBSCRIBE");
-      incomingCommands.add("SEND");
-      incomingCommands.add("DISCONNECT");
-
-      List<String> outgoingCommands = new ArrayList<>(3);
-      outgoingCommands.add("CONNECTED");
-      outgoingCommands.add("MESSAGE");
-      outgoingCommands.add("MESSAGE");
-
-      long timeout = System.currentTimeMillis() + 1000;
-
-      // Things are async, giving some time to things arrive before we actually assert
-      while (MyIncomingStompFrameInterceptor.incomingInterceptedFrames.size() < 4 &&
-         MyOutgoingStompFrameInterceptor.outgoingInterceptedFrames.size() < 3 &&
-         timeout > System.currentTimeMillis()) {
-         Thread.sleep(10);
-      }
-
-      Assert.assertEquals(4, MyIncomingStompFrameInterceptor.incomingInterceptedFrames.size());
-      Assert.assertEquals(3, MyOutgoingStompFrameInterceptor.outgoingInterceptedFrames.size());
-
-      for (int i = 0; i < MyIncomingStompFrameInterceptor.incomingInterceptedFrames.size(); i++) {
-         Assert.assertEquals(incomingCommands.get(i), MyIncomingStompFrameInterceptor.incomingInterceptedFrames.get(i).getCommand());
-         Assert.assertEquals("incomingInterceptedVal", MyIncomingStompFrameInterceptor.incomingInterceptedFrames.get(i).getHeader("incomingInterceptedProp"));
-      }
-
-      for (int i = 0; i < MyOutgoingStompFrameInterceptor.outgoingInterceptedFrames.size(); i++) {
-         Assert.assertEquals(outgoingCommands.get(i), MyOutgoingStompFrameInterceptor.outgoingInterceptedFrames.get(i).getCommand());
-      }
-
-      Assert.assertEquals("incomingInterceptedVal", MyOutgoingStompFrameInterceptor.outgoingInterceptedFrames.get(2).getHeader("incomingInterceptedProp"));
-      Assert.assertEquals("outgoingInterceptedVal", MyOutgoingStompFrameInterceptor.outgoingInterceptedFrames.get(2).getHeader("outgoingInterceptedProp"));
-   }
-
-   protected JMSServerManager createServerWithStompInterceptor(List<String> stompIncomingInterceptor,
-                                                               List<String> stompOutgoingInterceptor) throws Exception {
-
-      Map<String, Object> params = new HashMap<>();
-      params.put(TransportConstants.PROTOCOLS_PROP_NAME, StompProtocolManagerFactory.STOMP_PROTOCOL_NAME);
-      params.put(TransportConstants.PORT_PROP_NAME, TransportConstants.DEFAULT_STOMP_PORT);
-      params.put(TransportConstants.STOMP_CONSUMERS_CREDIT, "-1");
-      TransportConfiguration stompTransport = new TransportConfiguration(NETTY_ACCEPTOR_FACTORY, params);
-
-      Configuration config = createBasicConfig().setPersistenceEnabled(false).addAcceptorConfiguration(stompTransport).addAcceptorConfiguration(new TransportConfiguration(INVM_ACCEPTOR_FACTORY)).setIncomingInterceptorClassNames(stompIncomingInterceptor).setOutgoingInterceptorClassNames(stompOutgoingInterceptor);
-
-      ActiveMQServer hornetQServer = addServer(ActiveMQServers.newActiveMQServer(config, defUser, defPass));
-
-      JMSConfiguration jmsConfig = new JMSConfigurationImpl();
-      jmsConfig.getQueueConfigurations().add(new JMSQueueConfigurationImpl().setName(getQueueName()).setDurable(false).setBindings(getQueueName()));
-      jmsConfig.getTopicConfigurations().add(new TopicConfigurationImpl().setName(getTopicName()).setBindings(getTopicName()));
-      server = new JMSServerManagerImpl(hornetQServer, jmsConfig);
-      server.setRegistry(new JndiBindingRegistry(new InVMNamingContext()));
-      return server;
-   }
-
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/84df373a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompConnectionCleanupTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompConnectionCleanupTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompConnectionCleanupTest.java
index 419b339..ac89c1d 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompConnectionCleanupTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompConnectionCleanupTest.java
@@ -19,32 +19,20 @@ package org.apache.activemq.artemis.tests.integration.stomp;
 import javax.jms.Message;
 import javax.jms.MessageConsumer;
 
-import org.apache.activemq.artemis.core.protocol.stomp.Stomp;
 import org.apache.activemq.artemis.jms.server.JMSServerManager;
+import org.apache.activemq.artemis.tests.integration.stomp.util.ClientStompFrame;
 import org.junit.Test;
 
-public class StompConnectionCleanupTest extends StompTestBase {
+public class StompConnectionCleanupTest extends StompTest {
 
    private static final long CONNECTION_TTL = 2000;
 
    // ARTEMIS-231
    @Test
    public void testConnectionCleanupWithTopicSubscription() throws Exception {
-      String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
-      sendFrame(frame);
-      frame = receiveFrame(10000);
+      conn.connect(defUser, defPass);
 
-      //We send and consumer a message to ensure a STOMP connection and server session is created
-
-      System.out.println("Received frame: " + frame);
-
-      assertTrue(frame.startsWith("CONNECTED"));
-
-      frame = "SUBSCRIBE\n" + "destination:" + getTopicPrefix() + getTopicName() + "\n" + "ack:auto\n\n" + Stomp.NULL;
-      sendFrame(frame);
-
-      frame = "DISCONNECT\n\n" + Stomp.NULL;
-      sendFrame(frame);
+      subscribeTopic(conn, null, "auto", null);
 
       // Now we wait until the connection is cleared on the server, which will happen some time after ttl, since no data
       // is being sent
@@ -72,25 +60,16 @@ public class StompConnectionCleanupTest extends StompTestBase {
 
    @Test
    public void testConnectionCleanup() throws Exception {
-      String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
-      sendFrame(frame);
-      frame = receiveFrame(10000);
+      conn.connect(defUser, defPass);
 
-      //We send and consumer a message to ensure a STOMP connection and server session is created
+      subscribe(conn, null, "auto", null);
 
-      System.out.println("Received frame: " + frame);
+      send(conn, getQueuePrefix() + getQueueName(), null, "Hello World");
 
-      assertTrue(frame.startsWith("CONNECTED"));
+      ClientStompFrame frame = conn.receiveFrame(10000);
 
-      frame = "SUBSCRIBE\n" + "destination:" + getQueuePrefix() + getQueueName() + "\n" + "ack:auto\n\n" + Stomp.NULL;
-      sendFrame(frame);
-
-      frame = "SEND\n" + "destination:" + getQueuePrefix() + getQueueName() + "\n\n" + "Hello World" + Stomp.NULL;
-      sendFrame(frame);
-
-      frame = receiveFrame(10000);
-      assertTrue(frame.startsWith("MESSAGE"));
-      assertTrue(frame.indexOf("destination:") > 0);
+      assertTrue(frame.getCommand().equals("MESSAGE"));
+      assertTrue(frame.getHeader("destination").equals(getQueuePrefix() + getQueueName()));
 
       // Now we wait until the connection is cleared on the server, which will happen some time after ttl, since no data
       // is being sent
@@ -118,13 +97,7 @@ public class StompConnectionCleanupTest extends StompTestBase {
 
    @Test
    public void testConnectionNotCleanedUp() throws Exception {
-      String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
-      sendFrame(frame);
-      frame = receiveFrame(10000);
-
-      //We send and consumer a message to ensure a STOMP connection and server session is created
-
-      assertTrue(frame.startsWith("CONNECTED"));
+      conn.connect(defUser, defPass);
 
       MessageConsumer consumer = session.createConsumer(queue);
 
@@ -136,8 +109,7 @@ public class StompConnectionCleanupTest extends StompTestBase {
       while (true) {
          //Send and receive a msg
 
-         frame = "SEND\n" + "destination:" + getQueuePrefix() + getQueueName() + "\n\n" + "Hello World" + Stomp.NULL;
-         sendFrame(frame);
+         send(conn, getQueuePrefix() + getQueueName(), null, "Hello World");
 
          Message msg = consumer.receive(1000);
          assertNotNull(msg);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/84df373a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompOverHttpTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompOverHttpTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompOverHttpTest.java
deleted file mode 100644
index 138e37c..0000000
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompOverHttpTest.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.tests.integration.stomp;
-
-import java.nio.charset.StandardCharsets;
-
-import io.netty.buffer.ByteBuf;
-import io.netty.channel.ChannelDuplexHandler;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelPromise;
-import io.netty.channel.socket.SocketChannel;
-import io.netty.handler.codec.http.DefaultFullHttpRequest;
-import io.netty.handler.codec.http.DefaultHttpContent;
-import io.netty.handler.codec.http.FullHttpRequest;
-import io.netty.handler.codec.http.HttpHeaders;
-import io.netty.handler.codec.http.HttpMethod;
-import io.netty.handler.codec.http.HttpRequestEncoder;
-import io.netty.handler.codec.http.HttpResponseDecoder;
-import io.netty.handler.codec.http.HttpVersion;
-import io.netty.handler.codec.string.StringDecoder;
-import io.netty.handler.codec.string.StringEncoder;
-
-public class StompOverHttpTest extends StompTest {
-
-   @Override
-   protected void addChannelHandlers(int index, SocketChannel ch) {
-      ch.pipeline().addLast(new HttpRequestEncoder());
-      ch.pipeline().addLast(new HttpResponseDecoder());
-      ch.pipeline().addLast(new HttpHandler());
-      ch.pipeline().addLast("decoder", new StringDecoder(StandardCharsets.UTF_8));
-      ch.pipeline().addLast("encoder", new StringEncoder(StandardCharsets.UTF_8));
-      ch.pipeline().addLast(new StompClientHandler(index));
-   }
-
-   @Override
-   public String receiveFrame(long timeOut) throws Exception {
-      //we are request/response so may need to send an empty request so we get responses piggy backed
-      sendFrame(new byte[]{});
-      return super.receiveFrame(timeOut);
-   }
-
-   class HttpHandler extends ChannelDuplexHandler {
-
-      @Override
-      public void channelRead(final ChannelHandlerContext ctx, final Object msg) throws Exception {
-         if (msg instanceof DefaultHttpContent) {
-            DefaultHttpContent response = (DefaultHttpContent) msg;
-            ctx.fireChannelRead(response.content());
-         }
-      }
-
-      @Override
-      public void write(final ChannelHandlerContext ctx, final Object msg, ChannelPromise promise) throws Exception {
-         if (msg instanceof ByteBuf) {
-            ByteBuf buf = (ByteBuf) msg;
-            FullHttpRequest httpRequest = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST, "", buf);
-            httpRequest.headers().add(HttpHeaders.Names.CONTENT_LENGTH, String.valueOf(buf.readableBytes()));
-            ctx.write(httpRequest, promise);
-         } else {
-            ctx.write(msg, promise);
-         }
-      }
-   }
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/84df373a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompOverWebsocketTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompOverWebsocketTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompOverWebsocketTest.java
deleted file mode 100644
index 95801f7..0000000
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompOverWebsocketTest.java
+++ /dev/null
@@ -1,151 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.tests.integration.stomp;
-
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.nio.charset.StandardCharsets;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.List;
-
-import io.netty.buffer.Unpooled;
-import io.netty.channel.Channel;
-import io.netty.channel.ChannelDuplexHandler;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelPromise;
-import io.netty.channel.socket.SocketChannel;
-import io.netty.handler.codec.http.FullHttpResponse;
-import io.netty.handler.codec.http.HttpClientCodec;
-import io.netty.handler.codec.http.HttpObjectAggregator;
-import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame;
-import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
-import io.netty.handler.codec.http.websocketx.PongWebSocketFrame;
-import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
-import io.netty.handler.codec.http.websocketx.WebSocketClientHandshaker;
-import io.netty.handler.codec.http.websocketx.WebSocketClientHandshakerFactory;
-import io.netty.handler.codec.http.websocketx.WebSocketFrame;
-import io.netty.handler.codec.http.websocketx.WebSocketVersion;
-import io.netty.handler.codec.string.StringDecoder;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-@RunWith(Parameterized.class)
-public class StompOverWebsocketTest extends StompTest {
-
-   private ChannelPromise handshakeFuture;
-
-   private final boolean useBinaryFrames;
-
-   @Parameterized.Parameters(name = "useBinaryFrames={0}")
-   public static Collection<Object[]> data() {
-      List<Object[]> list = Arrays.asList(new Object[][]{{Boolean.TRUE}, {Boolean.FALSE}});
-      return list;
-   }
-
-   public StompOverWebsocketTest(Boolean useBinaryFrames) {
-      super();
-      this.useBinaryFrames = useBinaryFrames;
-   }
-
-   @Override
-   protected void addChannelHandlers(int index, SocketChannel ch) throws URISyntaxException {
-      ch.pipeline().addLast("http-codec", new HttpClientCodec());
-      ch.pipeline().addLast("aggregator", new HttpObjectAggregator(8192));
-      ch.pipeline().addLast(new WebsocketHandler(WebSocketClientHandshakerFactory.newHandshaker(new URI("ws://localhost:8080/websocket"), WebSocketVersion.V13, null, false, null)));
-      ch.pipeline().addLast("decoder", new StringDecoder(StandardCharsets.UTF_8));
-      ch.pipeline().addLast(new StompClientHandler(index));
-   }
-
-   @Override
-   protected void handshake() throws InterruptedException {
-      handshakeFuture.sync();
-   }
-
-   class WebsocketHandler extends ChannelDuplexHandler {
-
-      private WebSocketClientHandshaker handshaker;
-
-      WebsocketHandler(WebSocketClientHandshaker webSocketClientHandshaker) {
-         this.handshaker = webSocketClientHandshaker;
-      }
-
-      @Override
-      public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
-         handshakeFuture = ctx.newPromise();
-      }
-
-      @Override
-      public void channelActive(ChannelHandlerContext ctx) throws Exception {
-         handshaker.handshake(ctx.channel());
-      }
-
-      @Override
-      public void channelInactive(ChannelHandlerContext ctx) throws Exception {
-         System.out.println("WebSocket Client disconnected!");
-      }
-
-      @Override
-      public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
-         Channel ch = ctx.channel();
-         if (!handshaker.isHandshakeComplete()) {
-            handshaker.finishHandshake(ch, (FullHttpResponse) msg);
-            System.out.println("WebSocket Client connected!");
-            handshakeFuture.setSuccess();
-            return;
-         }
-
-         if (msg instanceof FullHttpResponse) {
-            FullHttpResponse response = (FullHttpResponse) msg;
-            throw new Exception("Unexpected FullHttpResponse (getStatus=" + response.getStatus() + ", content=" + response.content().toString(StandardCharsets.UTF_8) + ')');
-         }
-
-         WebSocketFrame frame = (WebSocketFrame) msg;
-         if (frame instanceof BinaryWebSocketFrame) {
-            BinaryWebSocketFrame dataFrame = (BinaryWebSocketFrame) frame;
-            super.channelRead(ctx, dataFrame.content());
-         } else if (frame instanceof PongWebSocketFrame) {
-            System.out.println("WebSocket Client received pong");
-         } else if (frame instanceof CloseWebSocketFrame) {
-            System.out.println("WebSocket Client received closing");
-            ch.close();
-         }
-      }
-
-      @Override
-      public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
-         try {
-            if (msg instanceof String) {
-               ctx.write(createFrame((String) msg), promise);
-            } else {
-               super.write(ctx, msg, promise);
-            }
-         } catch (Exception e) {
-            e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
-         }
-      }
-   }
-
-   protected WebSocketFrame createFrame(String msg) {
-      if (useBinaryFrames) {
-         return new BinaryWebSocketFrame(Unpooled.copiedBuffer(msg, StandardCharsets.UTF_8));
-      } else {
-         return new TextWebSocketFrame(msg);
-      }
-   }
-
-}