You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2015/04/30 21:00:05 UTC

activemq git commit: https://issues.apache.org/jira/browse/AMQ-5755

Repository: activemq
Updated Branches:
  refs/heads/master 82200b6e7 -> f05f83b15


https://issues.apache.org/jira/browse/AMQ-5755

Unit tests for some STOMP over WebSockets functionality and some fixes
for resource cleanup.  

Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/f05f83b1
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/f05f83b1
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/f05f83b1

Branch: refs/heads/master
Commit: f05f83b15dbee58927c4b231ece0419122bff4bf
Parents: 82200b6
Author: Timothy Bish <ta...@gmail.com>
Authored: Thu Apr 30 14:41:59 2015 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Thu Apr 30 14:41:59 2015 -0400

----------------------------------------------------------------------
 .../transport/ws/AbstractStompSocket.java       |  17 +-
 .../transport/ws/jetty8/StompSocket.java        |  19 +-
 .../transport/ws/jetty9/StompSocket.java        |   9 +
 .../transport/ws/StompWSConnection.java         | 147 ++++++++++
 .../transport/ws/StompWSTransportTest.java      | 283 +++++++++++++++++++
 .../activemq/transport/ws/WSTransportTest.java  |  68 +----
 .../transport/ws/WSTransportTestSupport.java    | 160 +++++++++++
 .../src/test/resources/log4j.properties         |   2 +
 8 files changed, 643 insertions(+), 62 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/f05f83b1/activemq-http/src/main/java/org/apache/activemq/transport/ws/AbstractStompSocket.java
----------------------------------------------------------------------
diff --git a/activemq-http/src/main/java/org/apache/activemq/transport/ws/AbstractStompSocket.java b/activemq-http/src/main/java/org/apache/activemq/transport/ws/AbstractStompSocket.java
index b74bf5f..472561a 100644
--- a/activemq-http/src/main/java/org/apache/activemq/transport/ws/AbstractStompSocket.java
+++ b/activemq-http/src/main/java/org/apache/activemq/transport/ws/AbstractStompSocket.java
@@ -61,11 +61,9 @@ public abstract class AbstractStompSocket extends TransportSupport implements St
     }
 
     @Override
-    public abstract void sendToStomp(StompFrame command) throws IOException;
-
-    @Override
     protected void doStop(ServiceStopper stopper) throws Exception {
         stompInactivityMonitor.stop();
+        handleStopped();
     }
 
     @Override
@@ -74,6 +72,19 @@ public abstract class AbstractStompSocket extends TransportSupport implements St
         stompInactivityMonitor.setTransportListener(getTransportListener());
     }
 
+    //----- Abstract methods for subclasses to implement ---------------------//
+
+    @Override
+    public abstract void sendToStomp(StompFrame command) throws IOException;
+
+    /**
+     * Called when the transport is stopping to allow the dervied classes
+     * a chance to close WebSocket resources.
+     *
+     * @throws IOException if an error occurs during the stop.
+     */
+    public abstract void handleStopped() throws IOException;
+
     //----- Accessor methods -------------------------------------------------//
 
     @Override

http://git-wip-us.apache.org/repos/asf/activemq/blob/f05f83b1/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty8/StompSocket.java
----------------------------------------------------------------------
diff --git a/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty8/StompSocket.java b/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty8/StompSocket.java
index be1c8d1..23357bd 100644
--- a/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty8/StompSocket.java
+++ b/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty8/StompSocket.java
@@ -35,6 +35,20 @@ class StompSocket extends AbstractStompSocket implements WebSocket.OnTextMessage
     private Connection outbound;
 
     @Override
+    public void handleStopped() throws IOException {
+        if (outbound != null && outbound.isOpen()) {
+            outbound.close();
+        }
+    }
+
+    @Override
+    public void sendToStomp(StompFrame command) throws IOException {
+        outbound.sendMessage(command.format());
+    }
+
+    //----- WebSocket.OnTextMessage callback handlers ------------------------//
+
+    @Override
     public void onOpen(Connection connection) {
         this.outbound = connection;
     }
@@ -52,9 +66,4 @@ class StompSocket extends AbstractStompSocket implements WebSocket.OnTextMessage
     public void onMessage(String data) {
         processStompFrame(data);
     }
-
-    @Override
-    public void sendToStomp(StompFrame command) throws IOException {
-        outbound.sendMessage(command.format());
-    }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/f05f83b1/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty9/StompSocket.java
----------------------------------------------------------------------
diff --git a/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty9/StompSocket.java b/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty9/StompSocket.java
index a07ccd0..be7dc30 100644
--- a/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty9/StompSocket.java
+++ b/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty9/StompSocket.java
@@ -41,6 +41,15 @@ class StompSocket extends AbstractStompSocket implements WebSocketListener {
     }
 
     @Override
+    public void handleStopped() throws IOException {
+        if (session != null && session.isOpen()) {
+            session.close();
+        }
+    }
+
+    //----- WebSocketListener event callbacks --------------------------------//
+
+    @Override
     public void onWebSocketBinary(byte[] arg0, int arg1, int arg2) {
     }
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/f05f83b1/activemq-http/src/test/java/org/apache/activemq/transport/ws/StompWSConnection.java
----------------------------------------------------------------------
diff --git a/activemq-http/src/test/java/org/apache/activemq/transport/ws/StompWSConnection.java b/activemq-http/src/test/java/org/apache/activemq/transport/ws/StompWSConnection.java
new file mode 100644
index 0000000..09ec106
--- /dev/null
+++ b/activemq-http/src/test/java/org/apache/activemq/transport/ws/StompWSConnection.java
@@ -0,0 +1,147 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.ws;
+
+import java.io.IOException;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.activemq.transport.stomp.StompFrame;
+import org.eclipse.jetty.websocket.WebSocket;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * STOMP over WS based Connection class
+ */
+public class StompWSConnection implements WebSocket, WebSocket.OnTextMessage {
+
+    private static final Logger LOG = LoggerFactory.getLogger(StompWSConnection.class);
+
+    private Connection connection;
+    private final CountDownLatch connectLatch = new CountDownLatch(1);
+
+    private final BlockingQueue<String> prefetch = new LinkedBlockingDeque<String>();
+
+    private int closeCode = -1;
+    private String closeMessage;
+
+    public boolean isConnected() {
+        return connection != null ? connection.isOpen() : false;
+    }
+
+    public void close() {
+        if (connection != null) {
+            connection.close();
+        }
+    }
+
+    //---- Send methods ------------------------------------------------------//
+
+    public void sendRawFrame(String rawFrame) throws Exception {
+        checkConnected();
+        connection.sendMessage(rawFrame);
+    }
+
+    public void sendFrame(StompFrame frame) throws Exception {
+        checkConnected();
+        connection.sendMessage(frame.format());
+    }
+
+    public void keepAlive() throws Exception {
+        checkConnected();
+        connection.sendMessage("\n");
+    }
+
+    //----- Receive methods --------------------------------------------------//
+
+    public String receive() throws Exception {
+        checkConnected();
+        return prefetch.take();
+    }
+
+    public String receive(long timeout, TimeUnit unit) throws Exception {
+        checkConnected();
+        return prefetch.poll(timeout, unit);
+    }
+
+    public String receiveNoWait() throws Exception {
+        checkConnected();
+        return prefetch.poll();
+    }
+
+    //---- Blocking state change calls ---------------------------------------//
+
+    public void awaitConnection() throws InterruptedException {
+        connectLatch.await();
+    }
+
+    public boolean awaitConnection(long time, TimeUnit unit) throws InterruptedException {
+        return connectLatch.await(time, unit);
+    }
+
+    //----- Property Accessors -----------------------------------------------//
+
+    public int getCloseCode() {
+        return closeCode;
+    }
+
+    public String getCloseMessage() {
+        return closeMessage;
+    }
+
+    //----- WebSocket callback handlers --------------------------------------//
+
+    @Override
+    public void onMessage(String data) {
+        if (data == null) {
+            return;
+        }
+
+        if (data.equals("\n")) {
+            LOG.debug("New incoming heartbeat read");
+        } else {
+            LOG.trace("New incoming STOMP Frame read: \n{}", data);
+            prefetch.add(data);
+        }
+    }
+
+    @Override
+    public void onOpen(Connection connection) {
+        this.connection = connection;
+        this.connectLatch.countDown();
+    }
+
+    @Override
+    public void onClose(int closeCode, String message) {
+        LOG.trace("STOMP WS Connection closed, code:{} message:{}", closeCode, message);
+
+        this.connection = null;
+        this.closeCode = closeCode;
+        this.closeMessage = message;
+    }
+
+    //----- Internal implementation ------------------------------------------//
+
+    private void checkConnected() throws IOException {
+        if (!isConnected()) {
+            throw new IOException("STOMP WS Connection is closed.");
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/f05f83b1/activemq-http/src/test/java/org/apache/activemq/transport/ws/StompWSTransportTest.java
----------------------------------------------------------------------
diff --git a/activemq-http/src/test/java/org/apache/activemq/transport/ws/StompWSTransportTest.java b/activemq-http/src/test/java/org/apache/activemq/transport/ws/StompWSTransportTest.java
new file mode 100644
index 0000000..c6bfdd4
--- /dev/null
+++ b/activemq-http/src/test/java/org/apache/activemq/transport/ws/StompWSTransportTest.java
@@ -0,0 +1,283 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.ws;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.activemq.transport.stomp.Stomp;
+import org.apache.activemq.transport.stomp.StompFrame;
+import org.apache.activemq.util.Wait;
+import org.eclipse.jetty.websocket.WebSocketClient;
+import org.eclipse.jetty.websocket.WebSocketClientFactory;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Test STOMP over WebSockets functionality.
+ */
+public class StompWSTransportTest extends WSTransportTestSupport {
+
+    private static final Logger LOG = LoggerFactory.getLogger(StompWSTransportTest.class);
+
+    protected WebSocketClient wsClient;
+    protected StompWSConnection wsStompConnection;
+
+    @Override
+    @Before
+    public void setUp() throws Exception {
+        super.setUp();
+
+        WebSocketClientFactory clientFactory = new WebSocketClientFactory();
+        clientFactory.start();
+
+        wsClient = clientFactory.newWebSocketClient();
+        wsStompConnection = new StompWSConnection();
+
+        wsClient.open(wsConnectUri, wsStompConnection);
+        if (!wsStompConnection.awaitConnection(30, TimeUnit.SECONDS)) {
+            throw new IOException("Could not connect to STOMP WS endpoint");
+        }
+    }
+
+    @Override
+    @After
+    public void tearDown() throws Exception {
+        if (wsStompConnection != null) {
+            wsStompConnection.close();
+            wsStompConnection = null;
+            wsClient = null;
+        }
+
+        super.tearDown();
+    }
+
+    @Test(timeout = 60000)
+    public void testConnect() throws Exception {
+        String connectFrame = "STOMP\n" +
+                              "login:system\n" +
+                              "passcode:manager\n" +
+                              "accept-version:1.2\n" +
+                              "host:localhost\n" +
+                              "\n" + Stomp.NULL;
+
+        wsStompConnection.sendRawFrame(connectFrame);
+
+        String incoming = wsStompConnection.receive(30, TimeUnit.SECONDS);
+        assertNotNull(incoming);
+        assertTrue(incoming.startsWith("CONNECTED"));
+
+        assertTrue("Connection should close", Wait.waitFor(new Wait.Condition() {
+
+            @Override
+            public boolean isSatisified() throws Exception {
+                return getProxyToBroker().getCurrentConnectionsCount() == 1;
+            }
+        }));
+
+        wsStompConnection.sendFrame(new StompFrame(Stomp.Commands.DISCONNECT));
+        wsStompConnection.close();
+
+        assertTrue("Connection should close", Wait.waitFor(new Wait.Condition() {
+
+            @Override
+            public boolean isSatisified() throws Exception {
+                return getProxyToBroker().getCurrentConnectionsCount() == 0;
+            }
+        }));
+    }
+
+    @Test(timeout = 60000)
+    public void testConnectWithVersionOptions() throws Exception {
+        String connectFrame = "STOMP\n" +
+                              "login:system\n" +
+                              "passcode:manager\n" +
+                              "accept-version:1.0,1.1\n" +
+                              "host:localhost\n" +
+                              "\n" + Stomp.NULL;
+        wsStompConnection.sendRawFrame(connectFrame);
+
+        String incoming = wsStompConnection.receive(30, TimeUnit.SECONDS);
+
+        assertTrue(incoming.startsWith("CONNECTED"));
+        assertTrue(incoming.indexOf("version:1.1") >= 0);
+        assertTrue(incoming.indexOf("session:") >= 0);
+
+        wsStompConnection.sendFrame(new StompFrame(Stomp.Commands.DISCONNECT));
+        wsStompConnection.close();
+    }
+
+    @Test(timeout = 60000)
+    public void testRejectInvalidHeartbeats1() throws Exception {
+        String connectFrame = "STOMP\n" +
+                              "login:system\n" +
+                              "passcode:manager\n" +
+                              "accept-version:1.1\n" +
+                              "heart-beat:0\n" +
+                              "host:localhost\n" +
+                              "\n" + Stomp.NULL;
+        wsStompConnection.sendRawFrame(connectFrame);
+
+        String incoming = wsStompConnection.receive(30, TimeUnit.SECONDS);
+
+        assertTrue(incoming.startsWith("ERROR"));
+        assertTrue(incoming.indexOf("heart-beat") >= 0);
+        assertTrue(incoming.indexOf("message:") >= 0);
+
+        assertTrue("Connection should close", Wait.waitFor(new Wait.Condition() {
+
+            @Override
+            public boolean isSatisified() throws Exception {
+                return getProxyToBroker().getCurrentConnectionsCount() == 0;
+            }
+        }));
+    }
+
+    @Test(timeout = 60000)
+    public void testRejectInvalidHeartbeats2() throws Exception {
+        String connectFrame = "STOMP\n" +
+                              "login:system\n" +
+                              "passcode:manager\n" +
+                              "accept-version:1.1\n" +
+                              "heart-beat:T,0\n" +
+                              "host:localhost\n" +
+                              "\n" + Stomp.NULL;
+        wsStompConnection.sendRawFrame(connectFrame);
+
+        String incoming = wsStompConnection.receive(30, TimeUnit.SECONDS);
+
+        assertTrue(incoming.startsWith("ERROR"));
+        assertTrue(incoming.indexOf("heart-beat") >= 0);
+        assertTrue(incoming.indexOf("message:") >= 0);
+
+        assertTrue("Connection should close", Wait.waitFor(new Wait.Condition() {
+
+            @Override
+            public boolean isSatisified() throws Exception {
+                return getProxyToBroker().getCurrentConnectionsCount() == 0;
+            }
+        }));
+    }
+
+    @Test(timeout = 60000)
+    public void testRejectInvalidHeartbeats3() throws Exception {
+        String connectFrame = "STOMP\n" +
+                              "login:system\n" +
+                              "passcode:manager\n" +
+                              "accept-version:1.1\n" +
+                              "heart-beat:100,10,50\n" +
+                              "host:localhost\n" +
+                              "\n" + Stomp.NULL;
+        wsStompConnection.sendRawFrame(connectFrame);
+
+        String incoming = wsStompConnection.receive(30, TimeUnit.SECONDS);
+
+        assertTrue(incoming.startsWith("ERROR"));
+        assertTrue(incoming.indexOf("heart-beat") >= 0);
+        assertTrue(incoming.indexOf("message:") >= 0);
+
+        assertTrue("Connection should close", Wait.waitFor(new Wait.Condition() {
+
+            @Override
+            public boolean isSatisified() throws Exception {
+                return getProxyToBroker().getCurrentConnectionsCount() == 0;
+            }
+        }));
+    }
+
+    @Test(timeout = 60000)
+    public void testHeartbeatsDropsIdleConnection() throws Exception {
+        String connectFrame = "STOMP\n" +
+                              "login:system\n" +
+                              "passcode:manager\n" +
+                              "accept-version:1.1\n" +
+                              "heart-beat:1000,0\n" +
+                              "host:localhost\n" +
+                              "\n" + Stomp.NULL;
+
+        wsStompConnection.sendRawFrame(connectFrame);
+        String incoming = wsStompConnection.receive(30, TimeUnit.SECONDS);
+        assertTrue(incoming.startsWith("CONNECTED"));
+        assertTrue(incoming.indexOf("version:1.1") >= 0);
+        assertTrue(incoming.indexOf("heart-beat:") >= 0);
+        assertTrue(incoming.indexOf("session:") >= 0);
+
+        assertTrue("Broker should have closed WS connection:", Wait.waitFor(new Wait.Condition() {
+
+            @Override
+            public boolean isSatisified() throws Exception {
+                return !wsStompConnection.isConnected();
+            }
+        }));
+    }
+
+    @Test(timeout = 60000)
+    public void testHeartbeatsKeepsConnectionOpen() throws Exception {
+        String connectFrame = "STOMP\n" +
+                              "login:system\n" +
+                              "passcode:manager\n" +
+                              "accept-version:1.1\n" +
+                              "heart-beat:2000,0\n" +
+                              "host:localhost\n" +
+                              "\n" + Stomp.NULL;
+
+        wsStompConnection.sendRawFrame(connectFrame);
+        String incoming = wsStompConnection.receive(30, TimeUnit.SECONDS);
+        assertTrue(incoming.startsWith("CONNECTED"));
+        assertTrue(incoming.indexOf("version:1.1") >= 0);
+        assertTrue(incoming.indexOf("heart-beat:") >= 0);
+        assertTrue(incoming.indexOf("session:") >= 0);
+
+        String message = "SEND\n" + "destination:/queue/" + getTestName() + "\n\n" + "Hello World" + Stomp.NULL;
+        wsStompConnection.sendRawFrame(message);
+
+        ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor();
+
+        service.scheduleAtFixedRate(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    LOG.info("Sending next KeepAlive");
+                    wsStompConnection.keepAlive();
+                } catch (Exception e) {
+                }
+            }
+        }, 1, 1, TimeUnit.SECONDS);
+
+        TimeUnit.SECONDS.sleep(15);
+
+        String frame = "SUBSCRIBE\n" + "destination:/queue/" + getTestName() + "\n" +
+                       "id:12345\n" + "ack:auto\n\n" + Stomp.NULL;
+        wsStompConnection.sendRawFrame(frame);
+
+        incoming = wsStompConnection.receive(30, TimeUnit.SECONDS);
+        assertTrue(incoming.startsWith("MESSAGE"));
+
+        service.shutdownNow();
+        service.awaitTermination(5, TimeUnit.SECONDS);
+
+        wsStompConnection.sendFrame(new StompFrame(Stomp.Commands.DISCONNECT));
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/f05f83b1/activemq-http/src/test/java/org/apache/activemq/transport/ws/WSTransportTest.java
----------------------------------------------------------------------
diff --git a/activemq-http/src/test/java/org/apache/activemq/transport/ws/WSTransportTest.java b/activemq-http/src/test/java/org/apache/activemq/transport/ws/WSTransportTest.java
index 140356e..546209e 100644
--- a/activemq-http/src/test/java/org/apache/activemq/transport/ws/WSTransportTest.java
+++ b/activemq-http/src/test/java/org/apache/activemq/transport/ws/WSTransportTest.java
@@ -21,29 +21,22 @@ import static org.junit.Assert.assertTrue;
 
 import java.io.File;
 import java.io.IOException;
-import java.net.ServerSocket;
 import java.net.Socket;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.net.UnknownHostException;
 
-import javax.net.ServerSocketFactory;
-
-import org.apache.activemq.broker.BrokerFactory;
 import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.spring.SpringSslContext;
 import org.apache.activemq.transport.SocketConnectorFactory;
 import org.apache.activemq.transport.stomp.StompConnection;
 import org.apache.activemq.util.Wait;
 import org.eclipse.jetty.server.Connector;
 import org.eclipse.jetty.server.Server;
 import org.eclipse.jetty.webapp.WebAppContext;
-
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Ignore;
 import org.junit.Test;
-
 import org.openqa.selenium.By;
 import org.openqa.selenium.WebDriver;
 import org.openqa.selenium.WebElement;
@@ -54,43 +47,25 @@ import org.openqa.selenium.firefox.FirefoxProfile;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class WSTransportTest {
+public class WSTransportTest extends WSTransportTestSupport {
 
     private static final Logger LOG = LoggerFactory.getLogger(WSTransportTest.class);
     private static final int MESSAGE_COUNT = 1000;
 
-    private BrokerService broker;
     private Server server;
     private WebDriver driver;
     private File profileDir;
 
     private String stompUri;
-    private int proxyPort = 0;
-    protected String wsUri;
 
     private StompConnection stompConnection = new StompConnection();
 
-    protected BrokerService createBroker(boolean deleteMessages) throws Exception {
-        BrokerService broker = BrokerFactory.createBroker(
-                new URI("broker:()/localhost?persistent=false&useJmx=false"));
-
-        SpringSslContext context = new SpringSslContext();
-        context.setKeyStore("src/test/resources/server.keystore");
-        context.setKeyStoreKeyPassword("password");
-        context.setTrustStore("src/test/resources/client.keystore");
-        context.setTrustStorePassword("password");
-        context.afterPropertiesSet();
-        broker.setSslContext(context);
-
-        stompUri = broker.addConnector("stomp://localhost:0").getPublishableConnectString();
-        wsUri = broker.addConnector(getWSConnectorURI()).getPublishableConnectString();
-        broker.setDeleteAllMessagesOnStartup(deleteMessages);
-        broker.start();
-        broker.waitUntilStarted();
-
-        return broker;
+    @Override
+    protected void addAdditionalConnectors(BrokerService service) throws Exception {
+        stompUri = service.addConnector("stomp://localhost:0").getPublishableConnectString();
     }
 
+    @Override
     protected String getWSConnectorURI() {
         return "ws://127.0.0.1:61623?websocket.maxTextMessageSize=99999&transport.maxIdleTime=1001";
     }
@@ -114,31 +89,13 @@ public class WSTransportTest {
         return server;
     }
 
-    protected int getProxyPort() {
-        if (proxyPort == 0) {
-            ServerSocket ss = null;
-            try {
-                ss = ServerSocketFactory.getDefault().createServerSocket(0);
-                proxyPort = ss.getLocalPort();
-            } catch (IOException e) { // ignore
-            } finally {
-                try {
-                    if (ss != null ) {
-                        ss.close();
-                    }
-                } catch (IOException e) { // ignore
-                }
-            }
-        }
-        return proxyPort;
-    }
-
     protected Connector createJettyConnector(Server server) throws Exception {
         Connector c = new SocketConnectorFactory().createConnector(server);
         c.getClass().getMethod("setPort", Integer.TYPE).invoke(c, getProxyPort());
         return c;
     }
 
+    @Override
     protected void stopBroker() throws Exception {
         if (broker != null) {
             broker.stop();
@@ -147,14 +104,16 @@ public class WSTransportTest {
         }
     }
 
+    @Override
     @Before
     public void setUp() throws Exception {
+        super.setUp();
         profileDir = new File("activemq-data/profiles");
-        broker = createBroker(true);
         stompConnect();
         server = createWebServer();
     }
 
+    @Override
     @After
     public void tearDown() throws Exception {
         try {
@@ -163,10 +122,11 @@ public class WSTransportTest {
             // Some tests explicitly disconnect from stomp so can ignore
         } finally {
             try {
-                stopBroker();
-            } catch (Exception e) {
-                LOG.warn("Error on Broker stop.");
+                super.tearDown();
+            } catch (Exception ex) {
+                LOG.warn("Error on super tearDown()");
             }
+
             if (driver != null) {
                 try {
                     driver.quit();
@@ -234,7 +194,7 @@ public class WSTransportTest {
 
     protected String getTestURI() {
         int port = getProxyPort();
-        return "http://localhost:" + port + "/websocket.html#" + wsUri;
+        return "http://localhost:" + port + "/websocket.html#" + wsConnectUri;
     }
 
     public void doTestWebSockets(WebDriver driver) throws Exception {

http://git-wip-us.apache.org/repos/asf/activemq/blob/f05f83b1/activemq-http/src/test/java/org/apache/activemq/transport/ws/WSTransportTestSupport.java
----------------------------------------------------------------------
diff --git a/activemq-http/src/test/java/org/apache/activemq/transport/ws/WSTransportTestSupport.java b/activemq-http/src/test/java/org/apache/activemq/transport/ws/WSTransportTestSupport.java
new file mode 100644
index 0000000..9c4abc8
--- /dev/null
+++ b/activemq-http/src/test/java/org/apache/activemq/transport/ws/WSTransportTestSupport.java
@@ -0,0 +1,160 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.ws;
+
+import java.io.IOException;
+import java.net.ServerSocket;
+import java.net.URI;
+
+import javax.jms.JMSException;
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
+import javax.net.ServerSocketFactory;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.jmx.BrokerViewMBean;
+import org.apache.activemq.broker.jmx.QueueViewMBean;
+import org.apache.activemq.broker.jmx.TopicViewMBean;
+import org.apache.activemq.spring.SpringSslContext;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.rules.TestName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Basic infrastructure for test WebSocket connections.
+ */
+public class WSTransportTestSupport {
+
+    private static final Logger LOG = LoggerFactory.getLogger(WSTransportTestSupport.class);
+
+    @Rule
+    public TestName name = new TestName();
+
+    private int proxyPort = 0;
+
+    protected BrokerService broker;
+    protected URI wsConnectUri;
+
+    @Before
+    public void setUp() throws Exception {
+        broker = createBroker(true);
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        try {
+            stopBroker();
+        } catch(Exception e) {
+            LOG.warn("Error on Broker stop.");
+        }
+    }
+
+    protected String getWSConnectorURI() {
+        return "ws://127.0.0.1:" + getProxyPort() + "?websocket.maxTextMessageSize=99999&transport.maxIdleTime=1001";
+    }
+
+    protected void addAdditionalConnectors(BrokerService service) throws Exception {
+
+    }
+
+    protected BrokerService createBroker(boolean deleteMessages) throws Exception {
+
+        BrokerService broker = new BrokerService();
+
+        SpringSslContext context = new SpringSslContext();
+        context.setKeyStore("src/test/resources/server.keystore");
+        context.setKeyStoreKeyPassword("password");
+        context.setTrustStore("src/test/resources/client.keystore");
+        context.setTrustStorePassword("password");
+        context.afterPropertiesSet();
+        broker.setSslContext(context);
+
+        wsConnectUri = broker.addConnector(getWSConnectorURI()).getPublishableConnectURI();
+
+        broker.setUseJmx(true);
+        broker.getManagementContext().setCreateConnector(false);
+        broker.setPersistent(isPersistent());
+        broker.setDeleteAllMessagesOnStartup(deleteMessages);
+        broker.start();
+        broker.waitUntilStarted();
+
+        addAdditionalConnectors(broker);
+
+        return broker;
+    }
+
+    protected boolean isPersistent() {
+        return false;
+    }
+
+    protected String getTestName() {
+        return name.getMethodName();
+    }
+
+    protected int getProxyPort() {
+        if (proxyPort == 0) {
+            ServerSocket ss = null;
+            try {
+                ss = ServerSocketFactory.getDefault().createServerSocket(0);
+                proxyPort = ss.getLocalPort();
+            } catch (IOException e) { // ignore
+            } finally {
+                try {
+                    if (ss != null ) {
+                        ss.close();
+                    }
+                } catch (IOException e) { // ignore
+                }
+            }
+        }
+
+        return proxyPort;
+    }
+
+    protected void stopBroker() throws Exception {
+        if (broker != null) {
+            broker.stop();
+            broker.waitUntilStopped();
+            broker = null;
+        }
+    }
+
+    protected BrokerViewMBean getProxyToBroker() throws MalformedObjectNameException, JMSException {
+        ObjectName brokerViewMBean = new ObjectName(
+            "org.apache.activemq:type=Broker,brokerName=localhost");
+        BrokerViewMBean proxy = (BrokerViewMBean) broker.getManagementContext()
+                .newProxyInstance(brokerViewMBean, BrokerViewMBean.class, true);
+        return proxy;
+    }
+
+    protected QueueViewMBean getProxyToQueue(String name) throws MalformedObjectNameException, JMSException {
+        ObjectName queueViewMBeanName = new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Queue,destinationName="+name);
+        QueueViewMBean proxy = (QueueViewMBean) broker.getManagementContext()
+                .newProxyInstance(queueViewMBeanName, QueueViewMBean.class, true);
+        return proxy;
+    }
+
+    protected TopicViewMBean getProxyToTopic(String name) throws MalformedObjectNameException, JMSException {
+        ObjectName topicViewMBeanName = new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Topic,destinationName="+name);
+        TopicViewMBean proxy = (TopicViewMBean) broker.getManagementContext()
+                .newProxyInstance(topicViewMBeanName, TopicViewMBean.class, true);
+        return proxy;
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/f05f83b1/activemq-http/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/activemq-http/src/test/resources/log4j.properties b/activemq-http/src/test/resources/log4j.properties
index 7cc1941..f28c2a8 100755
--- a/activemq-http/src/test/resources/log4j.properties
+++ b/activemq-http/src/test/resources/log4j.properties
@@ -20,6 +20,8 @@
 #
 log4j.rootLogger=INFO, out, stdout
 
+log4j.logger.org.apache.activemq.transport.ws=DEBUG
+
 #log4j.logger.org.apache.activemq.broker.scheduler=DEBUG
 #log4j.logger.org.apache.activemq.network.DemandForwardingBridgeSupport=DEBUG
 #log4j.logger.org.apache.activemq.transport.failover=TRACE