You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2015/04/30 21:00:05 UTC
activemq git commit: https://issues.apache.org/jira/browse/AMQ-5755
Repository: activemq
Updated Branches:
refs/heads/master 82200b6e7 -> f05f83b15
https://issues.apache.org/jira/browse/AMQ-5755
Unit tests for some STOMP over WebSockets functionality and some fixes
for resource cleanup.
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/f05f83b1
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/f05f83b1
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/f05f83b1
Branch: refs/heads/master
Commit: f05f83b15dbee58927c4b231ece0419122bff4bf
Parents: 82200b6
Author: Timothy Bish <ta...@gmail.com>
Authored: Thu Apr 30 14:41:59 2015 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Thu Apr 30 14:41:59 2015 -0400
----------------------------------------------------------------------
.../transport/ws/AbstractStompSocket.java | 17 +-
.../transport/ws/jetty8/StompSocket.java | 19 +-
.../transport/ws/jetty9/StompSocket.java | 9 +
.../transport/ws/StompWSConnection.java | 147 ++++++++++
.../transport/ws/StompWSTransportTest.java | 283 +++++++++++++++++++
.../activemq/transport/ws/WSTransportTest.java | 68 +----
.../transport/ws/WSTransportTestSupport.java | 160 +++++++++++
.../src/test/resources/log4j.properties | 2 +
8 files changed, 643 insertions(+), 62 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/f05f83b1/activemq-http/src/main/java/org/apache/activemq/transport/ws/AbstractStompSocket.java
----------------------------------------------------------------------
diff --git a/activemq-http/src/main/java/org/apache/activemq/transport/ws/AbstractStompSocket.java b/activemq-http/src/main/java/org/apache/activemq/transport/ws/AbstractStompSocket.java
index b74bf5f..472561a 100644
--- a/activemq-http/src/main/java/org/apache/activemq/transport/ws/AbstractStompSocket.java
+++ b/activemq-http/src/main/java/org/apache/activemq/transport/ws/AbstractStompSocket.java
@@ -61,11 +61,9 @@ public abstract class AbstractStompSocket extends TransportSupport implements St
}
@Override
- public abstract void sendToStomp(StompFrame command) throws IOException;
-
- @Override
protected void doStop(ServiceStopper stopper) throws Exception {
stompInactivityMonitor.stop();
+ handleStopped();
}
@Override
@@ -74,6 +72,19 @@ public abstract class AbstractStompSocket extends TransportSupport implements St
stompInactivityMonitor.setTransportListener(getTransportListener());
}
+ //----- Abstract methods for subclasses to implement ---------------------//
+
+ @Override
+ public abstract void sendToStomp(StompFrame command) throws IOException;
+
+ /**
+ * Called when the transport is stopping to allow the dervied classes
+ * a chance to close WebSocket resources.
+ *
+ * @throws IOException if an error occurs during the stop.
+ */
+ public abstract void handleStopped() throws IOException;
+
//----- Accessor methods -------------------------------------------------//
@Override
http://git-wip-us.apache.org/repos/asf/activemq/blob/f05f83b1/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty8/StompSocket.java
----------------------------------------------------------------------
diff --git a/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty8/StompSocket.java b/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty8/StompSocket.java
index be1c8d1..23357bd 100644
--- a/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty8/StompSocket.java
+++ b/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty8/StompSocket.java
@@ -35,6 +35,20 @@ class StompSocket extends AbstractStompSocket implements WebSocket.OnTextMessage
private Connection outbound;
@Override
+ public void handleStopped() throws IOException {
+ if (outbound != null && outbound.isOpen()) {
+ outbound.close();
+ }
+ }
+
+ @Override
+ public void sendToStomp(StompFrame command) throws IOException {
+ outbound.sendMessage(command.format());
+ }
+
+ //----- WebSocket.OnTextMessage callback handlers ------------------------//
+
+ @Override
public void onOpen(Connection connection) {
this.outbound = connection;
}
@@ -52,9 +66,4 @@ class StompSocket extends AbstractStompSocket implements WebSocket.OnTextMessage
public void onMessage(String data) {
processStompFrame(data);
}
-
- @Override
- public void sendToStomp(StompFrame command) throws IOException {
- outbound.sendMessage(command.format());
- }
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/f05f83b1/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty9/StompSocket.java
----------------------------------------------------------------------
diff --git a/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty9/StompSocket.java b/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty9/StompSocket.java
index a07ccd0..be7dc30 100644
--- a/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty9/StompSocket.java
+++ b/activemq-http/src/main/java/org/apache/activemq/transport/ws/jetty9/StompSocket.java
@@ -41,6 +41,15 @@ class StompSocket extends AbstractStompSocket implements WebSocketListener {
}
@Override
+ public void handleStopped() throws IOException {
+ if (session != null && session.isOpen()) {
+ session.close();
+ }
+ }
+
+ //----- WebSocketListener event callbacks --------------------------------//
+
+ @Override
public void onWebSocketBinary(byte[] arg0, int arg1, int arg2) {
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/f05f83b1/activemq-http/src/test/java/org/apache/activemq/transport/ws/StompWSConnection.java
----------------------------------------------------------------------
diff --git a/activemq-http/src/test/java/org/apache/activemq/transport/ws/StompWSConnection.java b/activemq-http/src/test/java/org/apache/activemq/transport/ws/StompWSConnection.java
new file mode 100644
index 0000000..09ec106
--- /dev/null
+++ b/activemq-http/src/test/java/org/apache/activemq/transport/ws/StompWSConnection.java
@@ -0,0 +1,147 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.ws;
+
+import java.io.IOException;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.activemq.transport.stomp.StompFrame;
+import org.eclipse.jetty.websocket.WebSocket;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * STOMP over WS based Connection class
+ */
+public class StompWSConnection implements WebSocket, WebSocket.OnTextMessage {
+
+ private static final Logger LOG = LoggerFactory.getLogger(StompWSConnection.class);
+
+ private Connection connection;
+ private final CountDownLatch connectLatch = new CountDownLatch(1);
+
+ private final BlockingQueue<String> prefetch = new LinkedBlockingDeque<String>();
+
+ private int closeCode = -1;
+ private String closeMessage;
+
+ public boolean isConnected() {
+ return connection != null ? connection.isOpen() : false;
+ }
+
+ public void close() {
+ if (connection != null) {
+ connection.close();
+ }
+ }
+
+ //---- Send methods ------------------------------------------------------//
+
+ public void sendRawFrame(String rawFrame) throws Exception {
+ checkConnected();
+ connection.sendMessage(rawFrame);
+ }
+
+ public void sendFrame(StompFrame frame) throws Exception {
+ checkConnected();
+ connection.sendMessage(frame.format());
+ }
+
+ public void keepAlive() throws Exception {
+ checkConnected();
+ connection.sendMessage("\n");
+ }
+
+ //----- Receive methods --------------------------------------------------//
+
+ public String receive() throws Exception {
+ checkConnected();
+ return prefetch.take();
+ }
+
+ public String receive(long timeout, TimeUnit unit) throws Exception {
+ checkConnected();
+ return prefetch.poll(timeout, unit);
+ }
+
+ public String receiveNoWait() throws Exception {
+ checkConnected();
+ return prefetch.poll();
+ }
+
+ //---- Blocking state change calls ---------------------------------------//
+
+ public void awaitConnection() throws InterruptedException {
+ connectLatch.await();
+ }
+
+ public boolean awaitConnection(long time, TimeUnit unit) throws InterruptedException {
+ return connectLatch.await(time, unit);
+ }
+
+ //----- Property Accessors -----------------------------------------------//
+
+ public int getCloseCode() {
+ return closeCode;
+ }
+
+ public String getCloseMessage() {
+ return closeMessage;
+ }
+
+ //----- WebSocket callback handlers --------------------------------------//
+
+ @Override
+ public void onMessage(String data) {
+ if (data == null) {
+ return;
+ }
+
+ if (data.equals("\n")) {
+ LOG.debug("New incoming heartbeat read");
+ } else {
+ LOG.trace("New incoming STOMP Frame read: \n{}", data);
+ prefetch.add(data);
+ }
+ }
+
+ @Override
+ public void onOpen(Connection connection) {
+ this.connection = connection;
+ this.connectLatch.countDown();
+ }
+
+ @Override
+ public void onClose(int closeCode, String message) {
+ LOG.trace("STOMP WS Connection closed, code:{} message:{}", closeCode, message);
+
+ this.connection = null;
+ this.closeCode = closeCode;
+ this.closeMessage = message;
+ }
+
+ //----- Internal implementation ------------------------------------------//
+
+ private void checkConnected() throws IOException {
+ if (!isConnected()) {
+ throw new IOException("STOMP WS Connection is closed.");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq/blob/f05f83b1/activemq-http/src/test/java/org/apache/activemq/transport/ws/StompWSTransportTest.java
----------------------------------------------------------------------
diff --git a/activemq-http/src/test/java/org/apache/activemq/transport/ws/StompWSTransportTest.java b/activemq-http/src/test/java/org/apache/activemq/transport/ws/StompWSTransportTest.java
new file mode 100644
index 0000000..c6bfdd4
--- /dev/null
+++ b/activemq-http/src/test/java/org/apache/activemq/transport/ws/StompWSTransportTest.java
@@ -0,0 +1,283 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.ws;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.activemq.transport.stomp.Stomp;
+import org.apache.activemq.transport.stomp.StompFrame;
+import org.apache.activemq.util.Wait;
+import org.eclipse.jetty.websocket.WebSocketClient;
+import org.eclipse.jetty.websocket.WebSocketClientFactory;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Test STOMP over WebSockets functionality.
+ */
+public class StompWSTransportTest extends WSTransportTestSupport {
+
+ private static final Logger LOG = LoggerFactory.getLogger(StompWSTransportTest.class);
+
+ protected WebSocketClient wsClient;
+ protected StompWSConnection wsStompConnection;
+
+ @Override
+ @Before
+ public void setUp() throws Exception {
+ super.setUp();
+
+ WebSocketClientFactory clientFactory = new WebSocketClientFactory();
+ clientFactory.start();
+
+ wsClient = clientFactory.newWebSocketClient();
+ wsStompConnection = new StompWSConnection();
+
+ wsClient.open(wsConnectUri, wsStompConnection);
+ if (!wsStompConnection.awaitConnection(30, TimeUnit.SECONDS)) {
+ throw new IOException("Could not connect to STOMP WS endpoint");
+ }
+ }
+
+ @Override
+ @After
+ public void tearDown() throws Exception {
+ if (wsStompConnection != null) {
+ wsStompConnection.close();
+ wsStompConnection = null;
+ wsClient = null;
+ }
+
+ super.tearDown();
+ }
+
+ @Test(timeout = 60000)
+ public void testConnect() throws Exception {
+ String connectFrame = "STOMP\n" +
+ "login:system\n" +
+ "passcode:manager\n" +
+ "accept-version:1.2\n" +
+ "host:localhost\n" +
+ "\n" + Stomp.NULL;
+
+ wsStompConnection.sendRawFrame(connectFrame);
+
+ String incoming = wsStompConnection.receive(30, TimeUnit.SECONDS);
+ assertNotNull(incoming);
+ assertTrue(incoming.startsWith("CONNECTED"));
+
+ assertTrue("Connection should close", Wait.waitFor(new Wait.Condition() {
+
+ @Override
+ public boolean isSatisified() throws Exception {
+ return getProxyToBroker().getCurrentConnectionsCount() == 1;
+ }
+ }));
+
+ wsStompConnection.sendFrame(new StompFrame(Stomp.Commands.DISCONNECT));
+ wsStompConnection.close();
+
+ assertTrue("Connection should close", Wait.waitFor(new Wait.Condition() {
+
+ @Override
+ public boolean isSatisified() throws Exception {
+ return getProxyToBroker().getCurrentConnectionsCount() == 0;
+ }
+ }));
+ }
+
+ @Test(timeout = 60000)
+ public void testConnectWithVersionOptions() throws Exception {
+ String connectFrame = "STOMP\n" +
+ "login:system\n" +
+ "passcode:manager\n" +
+ "accept-version:1.0,1.1\n" +
+ "host:localhost\n" +
+ "\n" + Stomp.NULL;
+ wsStompConnection.sendRawFrame(connectFrame);
+
+ String incoming = wsStompConnection.receive(30, TimeUnit.SECONDS);
+
+ assertTrue(incoming.startsWith("CONNECTED"));
+ assertTrue(incoming.indexOf("version:1.1") >= 0);
+ assertTrue(incoming.indexOf("session:") >= 0);
+
+ wsStompConnection.sendFrame(new StompFrame(Stomp.Commands.DISCONNECT));
+ wsStompConnection.close();
+ }
+
+ @Test(timeout = 60000)
+ public void testRejectInvalidHeartbeats1() throws Exception {
+ String connectFrame = "STOMP\n" +
+ "login:system\n" +
+ "passcode:manager\n" +
+ "accept-version:1.1\n" +
+ "heart-beat:0\n" +
+ "host:localhost\n" +
+ "\n" + Stomp.NULL;
+ wsStompConnection.sendRawFrame(connectFrame);
+
+ String incoming = wsStompConnection.receive(30, TimeUnit.SECONDS);
+
+ assertTrue(incoming.startsWith("ERROR"));
+ assertTrue(incoming.indexOf("heart-beat") >= 0);
+ assertTrue(incoming.indexOf("message:") >= 0);
+
+ assertTrue("Connection should close", Wait.waitFor(new Wait.Condition() {
+
+ @Override
+ public boolean isSatisified() throws Exception {
+ return getProxyToBroker().getCurrentConnectionsCount() == 0;
+ }
+ }));
+ }
+
+ @Test(timeout = 60000)
+ public void testRejectInvalidHeartbeats2() throws Exception {
+ String connectFrame = "STOMP\n" +
+ "login:system\n" +
+ "passcode:manager\n" +
+ "accept-version:1.1\n" +
+ "heart-beat:T,0\n" +
+ "host:localhost\n" +
+ "\n" + Stomp.NULL;
+ wsStompConnection.sendRawFrame(connectFrame);
+
+ String incoming = wsStompConnection.receive(30, TimeUnit.SECONDS);
+
+ assertTrue(incoming.startsWith("ERROR"));
+ assertTrue(incoming.indexOf("heart-beat") >= 0);
+ assertTrue(incoming.indexOf("message:") >= 0);
+
+ assertTrue("Connection should close", Wait.waitFor(new Wait.Condition() {
+
+ @Override
+ public boolean isSatisified() throws Exception {
+ return getProxyToBroker().getCurrentConnectionsCount() == 0;
+ }
+ }));
+ }
+
+ @Test(timeout = 60000)
+ public void testRejectInvalidHeartbeats3() throws Exception {
+ String connectFrame = "STOMP\n" +
+ "login:system\n" +
+ "passcode:manager\n" +
+ "accept-version:1.1\n" +
+ "heart-beat:100,10,50\n" +
+ "host:localhost\n" +
+ "\n" + Stomp.NULL;
+ wsStompConnection.sendRawFrame(connectFrame);
+
+ String incoming = wsStompConnection.receive(30, TimeUnit.SECONDS);
+
+ assertTrue(incoming.startsWith("ERROR"));
+ assertTrue(incoming.indexOf("heart-beat") >= 0);
+ assertTrue(incoming.indexOf("message:") >= 0);
+
+ assertTrue("Connection should close", Wait.waitFor(new Wait.Condition() {
+
+ @Override
+ public boolean isSatisified() throws Exception {
+ return getProxyToBroker().getCurrentConnectionsCount() == 0;
+ }
+ }));
+ }
+
+ @Test(timeout = 60000)
+ public void testHeartbeatsDropsIdleConnection() throws Exception {
+ String connectFrame = "STOMP\n" +
+ "login:system\n" +
+ "passcode:manager\n" +
+ "accept-version:1.1\n" +
+ "heart-beat:1000,0\n" +
+ "host:localhost\n" +
+ "\n" + Stomp.NULL;
+
+ wsStompConnection.sendRawFrame(connectFrame);
+ String incoming = wsStompConnection.receive(30, TimeUnit.SECONDS);
+ assertTrue(incoming.startsWith("CONNECTED"));
+ assertTrue(incoming.indexOf("version:1.1") >= 0);
+ assertTrue(incoming.indexOf("heart-beat:") >= 0);
+ assertTrue(incoming.indexOf("session:") >= 0);
+
+ assertTrue("Broker should have closed WS connection:", Wait.waitFor(new Wait.Condition() {
+
+ @Override
+ public boolean isSatisified() throws Exception {
+ return !wsStompConnection.isConnected();
+ }
+ }));
+ }
+
+ @Test(timeout = 60000)
+ public void testHeartbeatsKeepsConnectionOpen() throws Exception {
+ String connectFrame = "STOMP\n" +
+ "login:system\n" +
+ "passcode:manager\n" +
+ "accept-version:1.1\n" +
+ "heart-beat:2000,0\n" +
+ "host:localhost\n" +
+ "\n" + Stomp.NULL;
+
+ wsStompConnection.sendRawFrame(connectFrame);
+ String incoming = wsStompConnection.receive(30, TimeUnit.SECONDS);
+ assertTrue(incoming.startsWith("CONNECTED"));
+ assertTrue(incoming.indexOf("version:1.1") >= 0);
+ assertTrue(incoming.indexOf("heart-beat:") >= 0);
+ assertTrue(incoming.indexOf("session:") >= 0);
+
+ String message = "SEND\n" + "destination:/queue/" + getTestName() + "\n\n" + "Hello World" + Stomp.NULL;
+ wsStompConnection.sendRawFrame(message);
+
+ ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor();
+
+ service.scheduleAtFixedRate(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ LOG.info("Sending next KeepAlive");
+ wsStompConnection.keepAlive();
+ } catch (Exception e) {
+ }
+ }
+ }, 1, 1, TimeUnit.SECONDS);
+
+ TimeUnit.SECONDS.sleep(15);
+
+ String frame = "SUBSCRIBE\n" + "destination:/queue/" + getTestName() + "\n" +
+ "id:12345\n" + "ack:auto\n\n" + Stomp.NULL;
+ wsStompConnection.sendRawFrame(frame);
+
+ incoming = wsStompConnection.receive(30, TimeUnit.SECONDS);
+ assertTrue(incoming.startsWith("MESSAGE"));
+
+ service.shutdownNow();
+ service.awaitTermination(5, TimeUnit.SECONDS);
+
+ wsStompConnection.sendFrame(new StompFrame(Stomp.Commands.DISCONNECT));
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq/blob/f05f83b1/activemq-http/src/test/java/org/apache/activemq/transport/ws/WSTransportTest.java
----------------------------------------------------------------------
diff --git a/activemq-http/src/test/java/org/apache/activemq/transport/ws/WSTransportTest.java b/activemq-http/src/test/java/org/apache/activemq/transport/ws/WSTransportTest.java
index 140356e..546209e 100644
--- a/activemq-http/src/test/java/org/apache/activemq/transport/ws/WSTransportTest.java
+++ b/activemq-http/src/test/java/org/apache/activemq/transport/ws/WSTransportTest.java
@@ -21,29 +21,22 @@ import static org.junit.Assert.assertTrue;
import java.io.File;
import java.io.IOException;
-import java.net.ServerSocket;
import java.net.Socket;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.UnknownHostException;
-import javax.net.ServerSocketFactory;
-
-import org.apache.activemq.broker.BrokerFactory;
import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.spring.SpringSslContext;
import org.apache.activemq.transport.SocketConnectorFactory;
import org.apache.activemq.transport.stomp.StompConnection;
import org.apache.activemq.util.Wait;
import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.webapp.WebAppContext;
-
import org.junit.After;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
-
import org.openqa.selenium.By;
import org.openqa.selenium.WebDriver;
import org.openqa.selenium.WebElement;
@@ -54,43 +47,25 @@ import org.openqa.selenium.firefox.FirefoxProfile;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class WSTransportTest {
+public class WSTransportTest extends WSTransportTestSupport {
private static final Logger LOG = LoggerFactory.getLogger(WSTransportTest.class);
private static final int MESSAGE_COUNT = 1000;
- private BrokerService broker;
private Server server;
private WebDriver driver;
private File profileDir;
private String stompUri;
- private int proxyPort = 0;
- protected String wsUri;
private StompConnection stompConnection = new StompConnection();
- protected BrokerService createBroker(boolean deleteMessages) throws Exception {
- BrokerService broker = BrokerFactory.createBroker(
- new URI("broker:()/localhost?persistent=false&useJmx=false"));
-
- SpringSslContext context = new SpringSslContext();
- context.setKeyStore("src/test/resources/server.keystore");
- context.setKeyStoreKeyPassword("password");
- context.setTrustStore("src/test/resources/client.keystore");
- context.setTrustStorePassword("password");
- context.afterPropertiesSet();
- broker.setSslContext(context);
-
- stompUri = broker.addConnector("stomp://localhost:0").getPublishableConnectString();
- wsUri = broker.addConnector(getWSConnectorURI()).getPublishableConnectString();
- broker.setDeleteAllMessagesOnStartup(deleteMessages);
- broker.start();
- broker.waitUntilStarted();
-
- return broker;
+ @Override
+ protected void addAdditionalConnectors(BrokerService service) throws Exception {
+ stompUri = service.addConnector("stomp://localhost:0").getPublishableConnectString();
}
+ @Override
protected String getWSConnectorURI() {
return "ws://127.0.0.1:61623?websocket.maxTextMessageSize=99999&transport.maxIdleTime=1001";
}
@@ -114,31 +89,13 @@ public class WSTransportTest {
return server;
}
- protected int getProxyPort() {
- if (proxyPort == 0) {
- ServerSocket ss = null;
- try {
- ss = ServerSocketFactory.getDefault().createServerSocket(0);
- proxyPort = ss.getLocalPort();
- } catch (IOException e) { // ignore
- } finally {
- try {
- if (ss != null ) {
- ss.close();
- }
- } catch (IOException e) { // ignore
- }
- }
- }
- return proxyPort;
- }
-
protected Connector createJettyConnector(Server server) throws Exception {
Connector c = new SocketConnectorFactory().createConnector(server);
c.getClass().getMethod("setPort", Integer.TYPE).invoke(c, getProxyPort());
return c;
}
+ @Override
protected void stopBroker() throws Exception {
if (broker != null) {
broker.stop();
@@ -147,14 +104,16 @@ public class WSTransportTest {
}
}
+ @Override
@Before
public void setUp() throws Exception {
+ super.setUp();
profileDir = new File("activemq-data/profiles");
- broker = createBroker(true);
stompConnect();
server = createWebServer();
}
+ @Override
@After
public void tearDown() throws Exception {
try {
@@ -163,10 +122,11 @@ public class WSTransportTest {
// Some tests explicitly disconnect from stomp so can ignore
} finally {
try {
- stopBroker();
- } catch (Exception e) {
- LOG.warn("Error on Broker stop.");
+ super.tearDown();
+ } catch (Exception ex) {
+ LOG.warn("Error on super tearDown()");
}
+
if (driver != null) {
try {
driver.quit();
@@ -234,7 +194,7 @@ public class WSTransportTest {
protected String getTestURI() {
int port = getProxyPort();
- return "http://localhost:" + port + "/websocket.html#" + wsUri;
+ return "http://localhost:" + port + "/websocket.html#" + wsConnectUri;
}
public void doTestWebSockets(WebDriver driver) throws Exception {
http://git-wip-us.apache.org/repos/asf/activemq/blob/f05f83b1/activemq-http/src/test/java/org/apache/activemq/transport/ws/WSTransportTestSupport.java
----------------------------------------------------------------------
diff --git a/activemq-http/src/test/java/org/apache/activemq/transport/ws/WSTransportTestSupport.java b/activemq-http/src/test/java/org/apache/activemq/transport/ws/WSTransportTestSupport.java
new file mode 100644
index 0000000..9c4abc8
--- /dev/null
+++ b/activemq-http/src/test/java/org/apache/activemq/transport/ws/WSTransportTestSupport.java
@@ -0,0 +1,160 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.ws;
+
+import java.io.IOException;
+import java.net.ServerSocket;
+import java.net.URI;
+
+import javax.jms.JMSException;
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
+import javax.net.ServerSocketFactory;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.jmx.BrokerViewMBean;
+import org.apache.activemq.broker.jmx.QueueViewMBean;
+import org.apache.activemq.broker.jmx.TopicViewMBean;
+import org.apache.activemq.spring.SpringSslContext;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.rules.TestName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Basic infrastructure for test WebSocket connections.
+ */
+public class WSTransportTestSupport {
+
+ private static final Logger LOG = LoggerFactory.getLogger(WSTransportTestSupport.class);
+
+ @Rule
+ public TestName name = new TestName();
+
+ private int proxyPort = 0;
+
+ protected BrokerService broker;
+ protected URI wsConnectUri;
+
+ @Before
+ public void setUp() throws Exception {
+ broker = createBroker(true);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ try {
+ stopBroker();
+ } catch(Exception e) {
+ LOG.warn("Error on Broker stop.");
+ }
+ }
+
+ protected String getWSConnectorURI() {
+ return "ws://127.0.0.1:" + getProxyPort() + "?websocket.maxTextMessageSize=99999&transport.maxIdleTime=1001";
+ }
+
+ protected void addAdditionalConnectors(BrokerService service) throws Exception {
+
+ }
+
+ protected BrokerService createBroker(boolean deleteMessages) throws Exception {
+
+ BrokerService broker = new BrokerService();
+
+ SpringSslContext context = new SpringSslContext();
+ context.setKeyStore("src/test/resources/server.keystore");
+ context.setKeyStoreKeyPassword("password");
+ context.setTrustStore("src/test/resources/client.keystore");
+ context.setTrustStorePassword("password");
+ context.afterPropertiesSet();
+ broker.setSslContext(context);
+
+ wsConnectUri = broker.addConnector(getWSConnectorURI()).getPublishableConnectURI();
+
+ broker.setUseJmx(true);
+ broker.getManagementContext().setCreateConnector(false);
+ broker.setPersistent(isPersistent());
+ broker.setDeleteAllMessagesOnStartup(deleteMessages);
+ broker.start();
+ broker.waitUntilStarted();
+
+ addAdditionalConnectors(broker);
+
+ return broker;
+ }
+
+ protected boolean isPersistent() {
+ return false;
+ }
+
+ protected String getTestName() {
+ return name.getMethodName();
+ }
+
+ protected int getProxyPort() {
+ if (proxyPort == 0) {
+ ServerSocket ss = null;
+ try {
+ ss = ServerSocketFactory.getDefault().createServerSocket(0);
+ proxyPort = ss.getLocalPort();
+ } catch (IOException e) { // ignore
+ } finally {
+ try {
+ if (ss != null ) {
+ ss.close();
+ }
+ } catch (IOException e) { // ignore
+ }
+ }
+ }
+
+ return proxyPort;
+ }
+
+ protected void stopBroker() throws Exception {
+ if (broker != null) {
+ broker.stop();
+ broker.waitUntilStopped();
+ broker = null;
+ }
+ }
+
+ protected BrokerViewMBean getProxyToBroker() throws MalformedObjectNameException, JMSException {
+ ObjectName brokerViewMBean = new ObjectName(
+ "org.apache.activemq:type=Broker,brokerName=localhost");
+ BrokerViewMBean proxy = (BrokerViewMBean) broker.getManagementContext()
+ .newProxyInstance(brokerViewMBean, BrokerViewMBean.class, true);
+ return proxy;
+ }
+
+ protected QueueViewMBean getProxyToQueue(String name) throws MalformedObjectNameException, JMSException {
+ ObjectName queueViewMBeanName = new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Queue,destinationName="+name);
+ QueueViewMBean proxy = (QueueViewMBean) broker.getManagementContext()
+ .newProxyInstance(queueViewMBeanName, QueueViewMBean.class, true);
+ return proxy;
+ }
+
+ protected TopicViewMBean getProxyToTopic(String name) throws MalformedObjectNameException, JMSException {
+ ObjectName topicViewMBeanName = new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Topic,destinationName="+name);
+ TopicViewMBean proxy = (TopicViewMBean) broker.getManagementContext()
+ .newProxyInstance(topicViewMBeanName, TopicViewMBean.class, true);
+ return proxy;
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq/blob/f05f83b1/activemq-http/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/activemq-http/src/test/resources/log4j.properties b/activemq-http/src/test/resources/log4j.properties
index 7cc1941..f28c2a8 100755
--- a/activemq-http/src/test/resources/log4j.properties
+++ b/activemq-http/src/test/resources/log4j.properties
@@ -20,6 +20,8 @@
#
log4j.rootLogger=INFO, out, stdout
+log4j.logger.org.apache.activemq.transport.ws=DEBUG
+
#log4j.logger.org.apache.activemq.broker.scheduler=DEBUG
#log4j.logger.org.apache.activemq.network.DemandForwardingBridgeSupport=DEBUG
#log4j.logger.org.apache.activemq.transport.failover=TRACE