You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2012/11/12 01:15:56 UTC
svn commit: r1408161 [1/4] - in /activemq/trunk:
activemq-core/src/test/java/org/apache/activemq/transport/stomp/
activemq-core/src/test/resources/org/apache/activemq/transport/stomp/
activemq-stomp/ activemq-stomp/src/test/ activemq-stomp/src/test/jav...
Author: tabish
Date: Mon Nov 12 00:15:50 2012
New Revision: 1408161
URL: http://svn.apache.org/viewvc?rev=1408161&view=rev
Log:
Move STOMP unit tests into the STOMP module and clean them up:
All tests now use a common StompTestSupport base class to remove cut and paste test code.
All tests are now using JUnit 4 tests.
All tests are run with AutoFailSupport on so they won't hang.
All tests use dynamic port assignment so they shouldn't clash with others.
Cleaned up the pom file
Added:
activemq/trunk/activemq-stomp/src/test/
activemq/trunk/activemq-stomp/src/test/java/
activemq/trunk/activemq-stomp/src/test/java/org/
activemq/trunk/activemq-stomp/src/test/java/org/apache/
activemq/trunk/activemq-stomp/src/test/java/org/apache/activemq/
activemq/trunk/activemq-stomp/src/test/java/org/apache/activemq/transport/
activemq/trunk/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/
activemq/trunk/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/ConnectTest.java (with props)
activemq/trunk/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/SamplePojo.java (with props)
activemq/trunk/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/Stomp11NIOSSLTest.java (with props)
activemq/trunk/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/Stomp11NIOTest.java (with props)
activemq/trunk/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/Stomp11SslAuthTest.java (with props)
activemq/trunk/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/Stomp11Test.java (with props)
activemq/trunk/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/Stomp12NIOSSLTest.java (with props)
activemq/trunk/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/Stomp12NIOTest.java (with props)
activemq/trunk/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/Stomp12SslAuthTest.java (with props)
activemq/trunk/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/Stomp12Test.java (with props)
activemq/trunk/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/StompAdvisoryTest.java (with props)
activemq/trunk/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/StompFrameTest.java (with props)
activemq/trunk/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/StompLoadTest.java (with props)
activemq/trunk/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/StompMissingMessageTest.java (with props)
activemq/trunk/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/StompNIOLoadTest.java (with props)
activemq/trunk/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/StompNIOSSLLoadTest.java (with props)
activemq/trunk/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/StompNIOSSLTest.java (with props)
activemq/trunk/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/StompNIOTest.java (with props)
activemq/trunk/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/StompSSLLoadTest.java (with props)
activemq/trunk/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/StompSslAuthTest.java (with props)
activemq/trunk/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/StompSslTest.java (with props)
activemq/trunk/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/StompSubscriptionRemoveTest.java (with props)
activemq/trunk/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/StompTelnetTest.java (with props)
activemq/trunk/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/StompTest.java (with props)
activemq/trunk/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/StompTestSupport.java (with props)
activemq/trunk/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/StompVirtualTopicTest.java (with props)
activemq/trunk/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/util/
activemq/trunk/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/util/ResourceLoadingSslContext.java (with props)
activemq/trunk/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/util/XStreamBrokerContext.java (with props)
activemq/trunk/activemq-stomp/src/test/resources/
activemq/trunk/activemq-stomp/src/test/resources/client.keystore (with props)
activemq/trunk/activemq-stomp/src/test/resources/log4j.properties (with props)
activemq/trunk/activemq-stomp/src/test/resources/login.config (with props)
activemq/trunk/activemq-stomp/src/test/resources/org/
activemq/trunk/activemq-stomp/src/test/resources/org/apache/
activemq/trunk/activemq-stomp/src/test/resources/org/apache/activemq/
activemq/trunk/activemq-stomp/src/test/resources/org/apache/activemq/security/
activemq/trunk/activemq-stomp/src/test/resources/org/apache/activemq/security/groups.properties (with props)
activemq/trunk/activemq-stomp/src/test/resources/org/apache/activemq/security/users.properties (with props)
activemq/trunk/activemq-stomp/src/test/resources/server.keystore (with props)
Removed:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/
activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/transport/stomp/
Modified:
activemq/trunk/activemq-stomp/pom.xml
Modified: activemq/trunk/activemq-stomp/pom.xml
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-stomp/pom.xml?rev=1408161&r1=1408160&r2=1408161&view=diff
==============================================================================
--- activemq/trunk/activemq-stomp/pom.xml (original)
+++ activemq/trunk/activemq-stomp/pom.xml Mon Nov 12 00:15:50 2012
@@ -39,23 +39,8 @@
<!-- Required Dependencies -->
<!-- =============================== -->
<dependency>
- <groupId>org.apache.activemq</groupId>
- <artifactId>activemq-broker</artifactId>
- </dependency>
-
- <dependency>
<groupId>${project.groupId}</groupId>
- <artifactId>activeio-core</artifactId>
- <optional>true</optional>
- </dependency>
- <dependency>
- <groupId>org.apache.activemq.protobuf</groupId>
- <artifactId>activemq-protobuf</artifactId>
- <optional>false</optional>
- </dependency>
- <dependency>
- <groupId>org.fusesource.mqtt-client</groupId>
- <artifactId>mqtt-client</artifactId>
+ <artifactId>activemq-broker</artifactId>
</dependency>
<!-- =============================== -->
@@ -63,74 +48,36 @@
<!-- =============================== -->
<dependency>
- <groupId>org.osgi</groupId>
- <artifactId>org.osgi.core</artifactId>
- <scope>provided</scope>
- <optional>true</optional>
- </dependency>
- <dependency>
- <groupId>org.apache.activemq</groupId>
- <artifactId>activemq-jaas</artifactId>
- <optional>true</optional>
- </dependency>
- <dependency>
- <groupId>org.apache.geronimo.specs</groupId>
- <artifactId>geronimo-jta_1.0.1B_spec</artifactId>
- <optional>true</optional>
- </dependency>
- <dependency>
- <groupId>org.apache.geronimo.specs</groupId>
- <artifactId>geronimo-j2ee-management_1.1_spec</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.geronimo.specs</groupId>
- <artifactId>geronimo-annotation_1.0_spec</artifactId>
- <optional>true</optional>
- </dependency>
- <dependency>
- <groupId>org.apache.geronimo.specs</groupId>
- <artifactId>geronimo-jacc_1.1_spec</artifactId>
- <optional>true</optional>
- </dependency>
- <dependency>
<groupId>com.thoughtworks.xstream</groupId>
<artifactId>xstream</artifactId>
<optional>true</optional>
</dependency>
<dependency>
- <groupId>org.codehaus.jettison</groupId>
- <artifactId>jettison</artifactId>
- <optional>true</optional>
- </dependency>
-
- <!-- for XML parsing -->
- <dependency>
- <groupId>org.apache.xbean</groupId>
- <artifactId>xbean-spring</artifactId>
+ <groupId>org.codehaus.jettison</groupId>
+ <artifactId>jettison</artifactId>
<optional>true</optional>
</dependency>
+
+ <!-- =============================== -->
+ <!-- Testing Dependencies -->
+ <!-- =============================== -->
+
<dependency>
- <groupId>org.springframework</groupId>
- <artifactId>spring-context</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.derby</groupId>
- <artifactId>derby</artifactId>
- <optional>true</optional>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>activemq-kahadb-store</artifactId>
+ <scope>test</scope>
</dependency>
<dependency>
- <groupId>xalan</groupId>
- <artifactId>xalan</artifactId>
- <optional>true</optional>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>activemq-broker</artifactId>
+ <type>test-jar</type>
+ <scope>test</scope>
</dependency>
<dependency>
- <groupId>commons-net</groupId>
- <artifactId>commons-net</artifactId>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>activemq-jaas</artifactId>
+ <scope>test</scope>
</dependency>
-
- <!-- =============================== -->
- <!-- Testing Dependencies -->
- <!-- =============================== -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
@@ -165,36 +112,6 @@
</resource>
</resources>
- <pluginManagement>
- <plugins>
- <!--This plugin's configuration is used to store Eclipse m2e settings only.
- It has no influence on the Maven build itself.-->
- <plugin>
- <groupId>org.eclipse.m2e</groupId>
- <artifactId>lifecycle-mapping</artifactId>
- <version>1.0.0</version>
- <configuration>
- <lifecycleMappingMetadata>
- <pluginExecutions>
- <pluginExecution>
- <pluginExecutionFilter>
- <groupId>org.apache.activemq.protobuf</groupId>
- <artifactId>activemq-protobuf</artifactId>
- <versionRange>[0.0.0,)</versionRange>
- <goals>
- <goal>compile</goal>
- </goals>
- </pluginExecutionFilter>
- <action>
- <ignore />
- </action>
- </pluginExecution>
- </pluginExecutions>
- </lifecycleMappingMetadata>
- </configuration>
- </plugin>
- </plugins>
- </pluginManagement>
<plugins>
<plugin>
<artifactId>maven-surefire-plugin</artifactId>
@@ -202,8 +119,8 @@
<forkMode>always</forkMode>
<argLine>${surefire.argLine}</argLine>
<runOrder>alphabetical</runOrder>
-
- <systemProperties>
+ <failIfNoTests>false</failIfNoTests>
+ <systemProperties>
<property>
<name>org.apache.activemq.default.directory.prefix</name>
<value>target/</value>
@@ -219,23 +136,12 @@
<value>file:target/test-classes/log4j.properties</value>
</property>
-->
- </systemProperties>
- <includes>
- <include>**/*Test.*</include>
- </includes>
+ </systemProperties>
+ <includes>
+ <include>**/*Test.*</include>
+ </includes>
</configuration>
</plugin>
- <plugin>
- <groupId>org.apache.activemq.protobuf</groupId>
- <artifactId>activemq-protobuf</artifactId>
- <executions>
- <execution>
- <goals>
- <goal>compile</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
</plugins>
</build>
<profiles>
Added: activemq/trunk/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/ConnectTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/ConnectTest.java?rev=1408161&view=auto
==============================================================================
--- activemq/trunk/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/ConnectTest.java (added)
+++ activemq/trunk/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/ConnectTest.java Mon Nov 12 00:15:50 2012
@@ -0,0 +1,178 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.stomp;
+
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.util.Vector;
+import javax.net.ServerSocketFactory;
+import org.apache.activemq.broker.BrokerPlugin;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.security.JaasDualAuthenticationPlugin;
+import org.apache.activemq.util.Wait;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+import static junit.framework.Assert.assertTrue;
+
+// https://issues.apache.org/jira/browse/AMQ-3393
+public class ConnectTest {
+ private static final Logger LOG = LoggerFactory.getLogger(ConnectTest.class);
+ BrokerService brokerService;
+ Vector<Throwable> exceptions = new Vector<Throwable>();
+
+ @Before
+ public void startBroker() throws Exception {
+ exceptions.clear();
+ brokerService = new BrokerService();
+ brokerService.setPersistent(false);
+ brokerService.setAdvisorySupport(false);
+ }
+
+ @After
+ public void stopBroker() throws Exception {
+ if (brokerService != null) {
+ brokerService.stop();
+ }
+ }
+
+ @Test
+ public void testStompConnectLeak() throws Exception {
+
+ brokerService.addConnector("stomp://0.0.0.0:0?transport.soLinger=0");
+ brokerService.start();
+
+ Thread t1 = new Thread() {
+ StompConnection connection = new StompConnection();
+
+ public void run() {
+ try {
+ connection.open("localhost", brokerService.getTransportConnectors().get(0).getConnectUri().getPort());
+ connection.connect("system", "manager");
+ connection.disconnect();
+ } catch (Exception ex) {
+ LOG.error("unexpected exception on connect/disconnect", ex);
+ exceptions.add(ex);
+ }
+ }
+ };
+
+ int i = 0;
+ long done = System.currentTimeMillis() + (15 * 1000);
+ while (System.currentTimeMillis() < done) {
+ t1.run();
+ if (++i % 5000 == 0) {
+ LOG.info("connection count on stomp connector:" + brokerService.getTransportConnectors().get(0).connectionCount());
+ }
+ }
+
+ assertTrue("no dangling connections", Wait.waitFor(new Wait.Condition() {
+ @Override
+ public boolean isSatisified() throws Exception {
+ return 0 == brokerService.getTransportConnectors().get(0).connectionCount();
+ }
+ }));
+ assertTrue("no exceptions", exceptions.isEmpty());
+ }
+
+ @Test
+ public void testJaasDualStopWithOpenConnection() throws Exception {
+
+ brokerService.setPlugins(new BrokerPlugin[]{new JaasDualAuthenticationPlugin()});
+ brokerService.addConnector("stomp://0.0.0.0:0?transport.closeAsync=false");
+ brokerService.start();
+
+ final int listenPort = brokerService.getTransportConnectors().get(0).getConnectUri().getPort();
+ Thread t1 = new Thread() {
+ StompConnection connection = new StompConnection();
+
+ public void run() {
+ try {
+ connection.open("localhost", listenPort);
+ connection.connect("system", "manager");
+ } catch (Exception ex) {
+ LOG.error("unexpected exception on connect/disconnect", ex);
+ exceptions.add(ex);
+ }
+ }
+ };
+
+ t1.run();
+
+ assertTrue("one connection", Wait.waitFor(new Wait.Condition() {
+ @Override
+ public boolean isSatisified() throws Exception {
+ return 1 == brokerService.getTransportConnectors().get(0).connectionCount();
+ }
+ }));
+
+ brokerService.stop();
+
+ // server socket should be available after stop
+ ServerSocket socket = ServerSocketFactory.getDefault().createServerSocket();
+ socket.setReuseAddress(true);
+ InetAddress address = InetAddress.getLocalHost();
+ socket.bind(new InetSocketAddress(address, listenPort));
+ LOG.info("bound address: " + socket);
+ socket.close();
+ assertTrue("no exceptions", exceptions.isEmpty());
+ }
+
+ @Test
+ public void testInactivityMonitor() throws Exception {
+
+ brokerService.addConnector("stomp://0.0.0.0:0?transport.defaultHeartBeat=5000,0&transport.useKeepAlive=false");
+ brokerService.start();
+
+ Thread t1 = new Thread() {
+ StompConnection connection = new StompConnection();
+
+ public void run() {
+ try {
+ connection.open("localhost", brokerService.getTransportConnectors().get(0).getConnectUri().getPort());
+ connection.connect("system", "manager");
+ } catch (Exception ex) {
+ LOG.error("unexpected exception on connect/disconnect", ex);
+ exceptions.add(ex);
+ }
+ }
+ };
+
+ t1.run();
+
+ assertTrue("one connection", Wait.waitFor(new Wait.Condition() {
+ @Override
+ public boolean isSatisified() throws Exception {
+ return 1 == brokerService.getTransportConnectors().get(0).connectionCount();
+ }
+ }));
+
+ // and it should be closed due to inactivity
+ assertTrue("no dangling connections", Wait.waitFor(new Wait.Condition() {
+ @Override
+ public boolean isSatisified() throws Exception {
+ return 0 == brokerService.getTransportConnectors().get(0).connectionCount();
+ }
+ }));
+ assertTrue("no exceptions", exceptions.isEmpty());
+ }
+}
\ No newline at end of file
Propchange: activemq/trunk/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/ConnectTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/trunk/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/SamplePojo.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/SamplePojo.java?rev=1408161&view=auto
==============================================================================
--- activemq/trunk/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/SamplePojo.java (added)
+++ activemq/trunk/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/SamplePojo.java Mon Nov 12 00:15:50 2012
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.stomp;
+
+import java.io.Serializable;
+
+import com.thoughtworks.xstream.annotations.XStreamAlias;
+
+@XStreamAlias("pojo")
+public class SamplePojo implements Serializable {
+ private static final long serialVersionUID = 9118938642100015088L;
+
+ @XStreamAlias("name")
+ private String name;
+ @XStreamAlias("city")
+ private String city;
+
+ public SamplePojo() {
+ }
+
+ public SamplePojo(String name, String city) {
+ this.name = name;
+ this.city = city;
+ }
+
+ public String getCity() {
+ return city;
+ }
+
+ public void setCity(String city) {
+ this.city = city;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+}
Propchange: activemq/trunk/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/SamplePojo.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/trunk/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/Stomp11NIOSSLTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/Stomp11NIOSSLTest.java?rev=1408161&view=auto
==============================================================================
--- activemq/trunk/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/Stomp11NIOSSLTest.java (added)
+++ activemq/trunk/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/Stomp11NIOSSLTest.java Mon Nov 12 00:15:50 2012
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.stomp;
+
+import java.io.IOException;
+import java.net.Socket;
+
+import javax.net.SocketFactory;
+import javax.net.ssl.SSLSocketFactory;
+
+import org.apache.activemq.broker.TransportConnector;
+
+public class Stomp11NIOSSLTest extends Stomp11Test {
+
+ @Override
+ public void setUp() throws Exception {
+ System.setProperty("javax.net.ssl.trustStore", "src/test/resources/client.keystore");
+ System.setProperty("javax.net.ssl.trustStorePassword", "password");
+ System.setProperty("javax.net.ssl.trustStoreType", "jks");
+ System.setProperty("javax.net.ssl.keyStore", "src/test/resources/server.keystore");
+ System.setProperty("javax.net.ssl.keyStorePassword", "password");
+ System.setProperty("javax.net.ssl.keyStoreType", "jks");
+ super.setUp();
+ }
+
+ @Override
+ protected void addStompConnector() throws Exception {
+ TransportConnector connector = brokerService.addConnector("stomp+nio+ssl://0.0.0.0:"+nioSslPort);
+ nioSslPort = connector.getConnectUri().getPort();
+ }
+
+ @Override
+ protected Socket createSocket() throws IOException {
+ SocketFactory factory = SSLSocketFactory.getDefault();
+ return factory.createSocket("127.0.0.1", this.nioSslPort);
+ }
+}
Propchange: activemq/trunk/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/Stomp11NIOSSLTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/trunk/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/Stomp11NIOTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/Stomp11NIOTest.java?rev=1408161&view=auto
==============================================================================
--- activemq/trunk/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/Stomp11NIOTest.java (added)
+++ activemq/trunk/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/Stomp11NIOTest.java Mon Nov 12 00:15:50 2012
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.stomp;
+
+import java.io.IOException;
+import java.net.Socket;
+
+import org.apache.activemq.broker.TransportConnector;
+
+public class Stomp11NIOTest extends Stomp11Test {
+
+ @Override
+ protected void addStompConnector() throws Exception {
+ TransportConnector connector = brokerService.addConnector("stomp+nio://0.0.0.0:"+nioPort);
+ nioPort = connector.getConnectUri().getPort();
+ }
+
+ @Override
+ protected Socket createSocket() throws IOException {
+ return new Socket("127.0.0.1", this.nioPort);
+ }
+}
Propchange: activemq/trunk/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/Stomp11NIOTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/trunk/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/Stomp11SslAuthTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/Stomp11SslAuthTest.java?rev=1408161&view=auto
==============================================================================
--- activemq/trunk/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/Stomp11SslAuthTest.java (added)
+++ activemq/trunk/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/Stomp11SslAuthTest.java Mon Nov 12 00:15:50 2012
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.stomp;
+
+import java.io.IOException;
+import java.net.Socket;
+
+import javax.net.SocketFactory;
+import javax.net.ssl.SSLSocketFactory;
+
+import org.apache.activemq.broker.TransportConnector;
+
+public class Stomp11SslAuthTest extends Stomp11Test {
+
+ @Override
+ public void setUp() throws Exception {
+ System.setProperty("javax.net.ssl.trustStore", "src/test/resources/client.keystore");
+ System.setProperty("javax.net.ssl.trustStorePassword", "password");
+ System.setProperty("javax.net.ssl.trustStoreType", "jks");
+ System.setProperty("javax.net.ssl.keyStore", "src/test/resources/server.keystore");
+ System.setProperty("javax.net.ssl.keyStorePassword", "password");
+ System.setProperty("javax.net.ssl.keyStoreType", "jks");
+ //System.setProperty("javax.net.debug","ssl,handshake");
+ super.setUp();
+ }
+
+ @Override
+ protected void addOpenWireConnector() throws Exception {
+ TransportConnector connector = brokerService.addConnector(
+ "ssl://0.0.0.0:0?needClientAuth=true");
+ jmsUri = connector.getPublishableConnectString();
+ }
+
+ @Override
+ protected void addStompConnector() throws Exception {
+ TransportConnector connector = brokerService.addConnector(
+ "stomp+ssl://0.0.0.0:"+port+"?needClientAuth=true");
+ sslPort = connector.getConnectUri().getPort();
+ }
+
+ @Override
+ protected Socket createSocket() throws IOException {
+ SocketFactory factory = SSLSocketFactory.getDefault();
+ return factory.createSocket("127.0.0.1", this.sslPort);
+ }
+}
Propchange: activemq/trunk/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/Stomp11SslAuthTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/trunk/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/Stomp11Test.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/Stomp11Test.java?rev=1408161&view=auto
==============================================================================
--- activemq/trunk/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/Stomp11Test.java (added)
+++ activemq/trunk/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/Stomp11Test.java Mon Nov 12 00:15:50 2012
@@ -0,0 +1,1057 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.stomp;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.DataInputStream;
+import java.net.SocketTimeoutException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.Connection;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.management.ObjectName;
+
+import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.broker.jmx.BrokerViewMBean;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTextMessage;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class Stomp11Test extends StompTestSupport {
+
+ private static final Logger LOG = LoggerFactory.getLogger(Stomp11Test.class);
+
+ private Connection connection;
+ private Session session;
+ private ActiveMQQueue queue;
+
+ @Override
+ public void setUp() throws Exception {
+
+ super.setUp();
+
+ stompConnect();
+
+ connection = cf.createConnection("system", "manager");
+ session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ queue = new ActiveMQQueue(getQueueName());
+ connection.start();
+ }
+
+ @Override
+ protected void addStompConnector() throws Exception {
+ TransportConnector connector = brokerService.addConnector("stomp://0.0.0.0:"+port);
+ port = connector.getConnectUri().getPort();
+ }
+
+ @Test
+ public void testConnect() throws Exception {
+
+ String connectFrame = "STOMP\n" +
+ "login:system\n" +
+ "passcode:manager\n" +
+ "accept-version:1.1\n" +
+ "host:localhost\n" +
+ "request-id:1\n" +
+ "\n" + Stomp.NULL;
+ stompConnection.sendFrame(connectFrame);
+
+ String f = stompConnection.receiveFrame();
+ LOG.debug("Broker sent: " + f);
+
+ assertTrue(f.startsWith("CONNECTED"));
+ assertTrue(f.indexOf("response-id:1") >= 0);
+ assertTrue(f.indexOf("version:1.1") >= 0);
+ assertTrue(f.indexOf("session:") >= 0);
+
+ String frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
+ stompConnection.sendFrame(frame);
+ }
+
+ @Test
+ public void testConnectedNeverEncoded() throws Exception {
+
+ String connectFrame = "STOMP\n" +
+ "login:system\n" +
+ "passcode:manager\n" +
+ "accept-version:1.1\n" +
+ "host:localhost\n" +
+ "request-id:1\n" +
+ "\n" + Stomp.NULL;
+ stompConnection.sendFrame(connectFrame);
+
+ String f = stompConnection.receiveFrame();
+ LOG.debug("Broker sent: " + f);
+
+ assertTrue(f.startsWith("CONNECTED"));
+ assertTrue(f.indexOf("response-id:1") >= 0);
+ assertTrue(f.indexOf("version:1.1") >= 0);
+ assertTrue(f.indexOf("session:") >= 0);
+
+ int sessionHeader = f.indexOf("session:");
+ f = f.substring(sessionHeader + "session:".length());
+
+ LOG.info("session header follows: " + f);
+ assertTrue(f.startsWith("ID:"));
+
+ String frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
+ stompConnection.sendFrame(frame);
+ }
+
+ @Test
+ public void testConnectWithVersionOptions() throws Exception {
+
+ String connectFrame = "STOMP\n" +
+ "login:system\n" +
+ "passcode:manager\n" +
+ "accept-version:1.0,1.1\n" +
+ "host:localhost\n" +
+ "\n" + Stomp.NULL;
+ stompConnection.sendFrame(connectFrame);
+
+ String f = stompConnection.receiveFrame();
+ LOG.debug("Broker sent: " + f);
+
+ assertTrue(f.startsWith("CONNECTED"));
+ assertTrue(f.indexOf("version:1.1") >= 0);
+ assertTrue(f.indexOf("session:") >= 0);
+
+ String frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
+ stompConnection.sendFrame(frame);
+ }
+
+ @Test
+ public void testConnectWithValidFallback() throws Exception {
+
+ String connectFrame = "STOMP\n" +
+ "login:system\n" +
+ "passcode:manager\n" +
+ "accept-version:1.0,10.1\n" +
+ "host:localhost\n" +
+ "\n" + Stomp.NULL;
+ stompConnection.sendFrame(connectFrame);
+
+ String f = stompConnection.receiveFrame();
+ LOG.debug("Broker sent: " + f);
+
+ assertTrue(f.startsWith("CONNECTED"));
+ assertTrue(f.indexOf("version:1.0") >= 0);
+ assertTrue(f.indexOf("session:") >= 0);
+
+ String frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
+ stompConnection.sendFrame(frame);
+ }
+
+ @Test
+ public void testConnectWithInvalidFallback() throws Exception {
+
+ String connectFrame = "STOMP\n" +
+ "login:system\n" +
+ "passcode:manager\n" +
+ "accept-version:9.0,10.1\n" +
+ "host:localhost\n" +
+ "\n" + Stomp.NULL;
+ stompConnection.sendFrame(connectFrame);
+
+ String f = stompConnection.receiveFrame();
+ LOG.debug("Broker sent: " + f);
+
+ assertTrue(f.startsWith("ERROR"));
+ assertTrue(f.indexOf("version") >= 0);
+ assertTrue(f.indexOf("message:") >= 0);
+ }
+
+ @Test
+ public void testHeartbeats() throws Exception {
+
+ String connectFrame = "STOMP\n" +
+ "login:system\n" +
+ "passcode:manager\n" +
+ "accept-version:1.1\n" +
+ "heart-beat:0,1000\n" +
+ "host:localhost\n" +
+ "\n" + Stomp.NULL;
+
+ stompConnection.sendFrame(connectFrame);
+ String f = stompConnection.receiveFrame().trim();
+
+ LOG.info("Broker sent: " + f);
+
+ assertTrue("Failed to receive a connected frame.", f.startsWith("CONNECTED"));
+ assertTrue("Frame should have a versoion 1.1 header.", f.indexOf("version:1.1") >= 0);
+ assertTrue("Frame should have a heart beat header.", f.indexOf("heart-beat:") >= 0);
+ assertTrue("Frame should have a session header.", f.indexOf("session:") >= 0);
+
+ stompConnection.getStompSocket().getOutputStream().write('\n');
+
+ DataInputStream in = new DataInputStream(stompConnection.getStompSocket().getInputStream());
+ in.read();
+ {
+ long startTime = System.currentTimeMillis();
+ int input = in.read();
+ assertEquals("did not receive the correct hear beat value", '\n', input);
+ long endTime = System.currentTimeMillis();
+ assertTrue("Broker did not send KeepAlive in time", (endTime - startTime) >= 900);
+ }
+ {
+ long startTime = System.currentTimeMillis();
+ int input = in.read();
+ assertEquals("did not receive the correct hear beat value", '\n', input);
+ long endTime = System.currentTimeMillis();
+ assertTrue("Broker did not send KeepAlive in time", (endTime - startTime) >= 900);
+ }
+
+ String frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
+ stompConnection.sendFrame(frame);
+ }
+
+ @Test
+ public void testHeartbeatsDropsIdleConnection() throws Exception {
+
+ String connectFrame = "STOMP\n" +
+ "login:system\n" +
+ "passcode:manager\n" +
+ "accept-version:1.1\n" +
+ "heart-beat:1000,0\n" +
+ "host:localhost\n" +
+ "\n" + Stomp.NULL;
+
+ stompConnection.sendFrame(connectFrame);
+ String f = stompConnection.receiveFrame();
+ assertTrue(f.startsWith("CONNECTED"));
+ assertTrue(f.indexOf("version:1.1") >= 0);
+ assertTrue(f.indexOf("heart-beat:") >= 0);
+ assertTrue(f.indexOf("session:") >= 0);
+ LOG.debug("Broker sent: " + f);
+
+ long startTime = System.currentTimeMillis();
+
+ try {
+ f = stompConnection.receiveFrame();
+ LOG.debug("Broker sent: " + f);
+ fail();
+ } catch(Exception e) {
+ }
+
+ long endTime = System.currentTimeMillis();
+ assertTrue("Broker did close idle connection in time.", (endTime - startTime) >= 1000);
+ }
+
+ @Test
+ public void testHeartbeatsKeepsConnectionOpen() throws Exception {
+
+ String connectFrame = "STOMP\n" +
+ "login:system\n" +
+ "passcode:manager\n" +
+ "accept-version:1.1\n" +
+ "heart-beat:2000,0\n" +
+ "host:localhost\n" +
+ "\n" + Stomp.NULL;
+
+ stompConnection.sendFrame(connectFrame);
+ String f = stompConnection.receiveFrame();
+ assertTrue(f.startsWith("CONNECTED"));
+ assertTrue(f.indexOf("version:1.1") >= 0);
+ assertTrue(f.indexOf("heart-beat:") >= 0);
+ assertTrue(f.indexOf("session:") >= 0);
+ LOG.debug("Broker sent: " + f);
+
+ String message = "SEND\n" + "destination:/queue/" + getQueueName() + "\n\n" + "Hello World" + Stomp.NULL;
+ stompConnection.sendFrame(message);
+
+ ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor();
+
+ service.scheduleAtFixedRate(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ LOG.info("Sending next KeepAlive");
+ stompConnection.keepAlive();
+ } catch (Exception e) {
+ }
+ }
+ }, 1, 1, TimeUnit.SECONDS);
+
+ TimeUnit.SECONDS.sleep(20);
+
+ String frame = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" +
+ "id:12345\n" + "ack:auto\n\n" + Stomp.NULL;
+ stompConnection.sendFrame(frame);
+
+ StompFrame stompFrame = stompConnection.receive();
+ assertTrue(stompFrame.getAction().equals("MESSAGE"));
+
+ service.shutdownNow();
+
+ frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
+ stompConnection.sendFrame(frame);
+ }
+
+ @Test
+ public void testSendAfterMissingHeartbeat() throws Exception {
+
+ String connectFrame = "STOMP\n" + "login:system\n" +
+ "passcode:manager\n" +
+ "accept-version:1.1\n" +
+ "heart-beat:1000,0\n" +
+ "host:localhost\n" +
+ "\n" + Stomp.NULL;
+
+ stompConnection.sendFrame(connectFrame);
+ String f = stompConnection.receiveFrame();
+ assertTrue(f.startsWith("CONNECTED"));
+ assertTrue(f.indexOf("version:1.1") >= 0);
+ assertTrue(f.indexOf("heart-beat:") >= 0);
+ assertTrue(f.indexOf("session:") >= 0);
+ LOG.debug("Broker sent: " + f);
+
+ Thread.sleep(TimeUnit.SECONDS.toMillis(10));
+
+ try {
+ String message = "SEND\n" + "destination:/queue/" + getQueueName() + "\n" +
+ "receipt:1\n\n" + "Hello World" + Stomp.NULL;
+ stompConnection.sendFrame(message);
+ stompConnection.receiveFrame();
+ fail("SEND frame has been accepted after missing heart beat");
+ } catch (Exception ex) {
+ LOG.info(ex.getMessage());
+ }
+ }
+
+ @Test
+ public void testRejectInvalidHeartbeats1() throws Exception {
+
+ String connectFrame = "STOMP\n" +
+ "login:system\n" +
+ "passcode:manager\n" +
+ "accept-version:1.1\n" +
+ "heart-beat:0\n" +
+ "host:localhost\n" +
+ "\n" + Stomp.NULL;
+ stompConnection.sendFrame(connectFrame);
+
+ String f = stompConnection.receiveFrame();
+ LOG.debug("Broker sent: " + f);
+
+ assertTrue(f.startsWith("ERROR"));
+ assertTrue(f.indexOf("heart-beat") >= 0);
+ assertTrue(f.indexOf("message:") >= 0);
+ }
+
+ @Test
+ public void testRejectInvalidHeartbeats2() throws Exception {
+
+ String connectFrame = "STOMP\n" +
+ "login:system\n" +
+ "passcode:manager\n" +
+ "accept-version:1.1\n" +
+ "heart-beat:T,0\n" +
+ "host:localhost\n" +
+ "\n" + Stomp.NULL;
+ stompConnection.sendFrame(connectFrame);
+
+ String f = stompConnection.receiveFrame();
+ LOG.debug("Broker sent: " + f);
+
+ assertTrue(f.startsWith("ERROR"));
+ assertTrue(f.indexOf("heart-beat") >= 0);
+ assertTrue(f.indexOf("message:") >= 0);
+ }
+
+ @Test
+ public void testRejectInvalidHeartbeats3() throws Exception {
+
+ String connectFrame = "STOMP\n" +
+ "login:system\n" +
+ "passcode:manager\n" +
+ "accept-version:1.1\n" +
+ "heart-beat:100,10,50\n" +
+ "host:localhost\n" +
+ "\n" + Stomp.NULL;
+ stompConnection.sendFrame(connectFrame);
+
+ String f = stompConnection.receiveFrame();
+ LOG.debug("Broker sent: " + f);
+
+ assertTrue(f.startsWith("ERROR"));
+ assertTrue(f.indexOf("heart-beat") >= 0);
+ assertTrue(f.indexOf("message:") >= 0);
+ }
+
+ @Test
+ public void testSubscribeAndUnsubscribe() throws Exception {
+
+ String connectFrame = "STOMP\n" +
+ "login:system\n" +
+ "passcode:manager\n" +
+ "accept-version:1.1\n" +
+ "host:localhost\n" +
+ "\n" + Stomp.NULL;
+ stompConnection.sendFrame(connectFrame);
+
+ String f = stompConnection.receiveFrame();
+ LOG.debug("Broker sent: " + f);
+
+ assertTrue(f.startsWith("CONNECTED"));
+
+ String message = "SEND\n" + "destination:/queue/" + getQueueName() + "\n\n" + "Hello World" + Stomp.NULL;
+
+ stompConnection.sendFrame(message);
+
+ String frame = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" +
+ "id:12345\n" + "ack:auto\n\n" + Stomp.NULL;
+ stompConnection.sendFrame(frame);
+
+ StompFrame stompFrame = stompConnection.receive();
+ assertTrue(stompFrame.getAction().equals("MESSAGE"));
+
+ frame = "UNSUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" +
+ "id:12345\n\n" + Stomp.NULL;
+ stompConnection.sendFrame(frame);
+
+ Thread.sleep(4000);
+
+ stompConnection.sendFrame(message);
+
+ try {
+ frame = stompConnection.receiveFrame();
+ LOG.info("Received frame: " + frame);
+ fail("No message should have been received since subscription was removed");
+ } catch (SocketTimeoutException e) {
+ }
+
+ frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
+ stompConnection.sendFrame(frame);
+ }
+
+ @Test
+ public void testSubscribeWithNoId() throws Exception {
+
+ String connectFrame = "STOMP\n" +
+ "login:system\n" +
+ "passcode:manager\n" +
+ "accept-version:1.1\n" +
+ "host:localhost\n" +
+ "\n" + Stomp.NULL;
+ stompConnection.sendFrame(connectFrame);
+
+ String f = stompConnection.receiveFrame();
+ LOG.debug("Broker sent: " + f);
+
+ assertTrue(f.startsWith("CONNECTED"));
+
+ String frame = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" +
+ "ack:auto\n\n" + Stomp.NULL;
+ stompConnection.sendFrame(frame);
+
+ frame = stompConnection.receiveFrame();
+ assertTrue(frame.startsWith("ERROR"));
+
+ frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
+ stompConnection.sendFrame(frame);
+ }
+
+ @Test
+ public void testUnsubscribeWithNoId() throws Exception {
+
+ String connectFrame = "STOMP\n" +
+ "login:system\n" +
+ "passcode:manager\n" +
+ "accept-version:1.1\n" +
+ "host:localhost\n" +
+ "\n" + Stomp.NULL;
+ stompConnection.sendFrame(connectFrame);
+
+ String f = stompConnection.receiveFrame();
+ LOG.debug("Broker sent: " + f);
+
+ assertTrue(f.startsWith("CONNECTED"));
+
+ String frame = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" +
+ "id:12345\n" + "ack:auto\n\n" + Stomp.NULL;
+ stompConnection.sendFrame(frame);
+
+ Thread.sleep(2000);
+
+ frame = "UNSUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" + "\n\n" + Stomp.NULL;
+ stompConnection.sendFrame(frame);
+
+ frame = stompConnection.receiveFrame();
+ assertTrue(frame.startsWith("ERROR"));
+
+ frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
+ stompConnection.sendFrame(frame);
+ }
+
+ @Test
+ public void testAckMessageWithId() throws Exception {
+
+ String connectFrame = "STOMP\n" +
+ "login:system\n" +
+ "passcode:manager\n" +
+ "accept-version:1.1\n" +
+ "host:localhost\n" +
+ "\n" + Stomp.NULL;
+ stompConnection.sendFrame(connectFrame);
+
+ String f = stompConnection.receiveFrame();
+ LOG.debug("Broker sent: " + f);
+
+ assertTrue(f.startsWith("CONNECTED"));
+
+ String message = "SEND\n" + "destination:/queue/" + getQueueName() + "\n\n" + "Hello World" + Stomp.NULL;
+
+ stompConnection.sendFrame(message);
+
+ String frame = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" +
+ "id:12345\n" + "ack:client\n\n" + Stomp.NULL;
+ stompConnection.sendFrame(frame);
+
+ StompFrame received = stompConnection.receive();
+ assertTrue(received.getAction().equals("MESSAGE"));
+
+ frame = "ACK\n" + "subscription:12345\n" + "message-id:" +
+ received.getHeaders().get("message-id") + "\n\n" + Stomp.NULL;
+ stompConnection.sendFrame(frame);
+
+ frame = "UNSUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" +
+ "id:12345\n\n" + Stomp.NULL;
+ stompConnection.sendFrame(frame);
+
+ frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
+ stompConnection.sendFrame(frame);
+ }
+
+ @Test
+ public void testAckMessageWithNoId() throws Exception {
+
+ String connectFrame = "STOMP\n" +
+ "login:system\n" +
+ "passcode:manager\n" +
+ "accept-version:1.1\n" +
+ "host:localhost\n" +
+ "\n" + Stomp.NULL;
+ stompConnection.sendFrame(connectFrame);
+
+ String f = stompConnection.receiveFrame();
+ LOG.debug("Broker sent: " + f);
+
+ assertTrue(f.startsWith("CONNECTED"));
+
+ String message = "SEND\n" + "destination:/queue/" + getQueueName() + "\n\n" + "Hello World" + Stomp.NULL;
+
+ stompConnection.sendFrame(message);
+
+ String subscribe = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" +
+ "id:12345\n" + "ack:client\n\n" + Stomp.NULL;
+ stompConnection.sendFrame(subscribe);
+
+ StompFrame received = stompConnection.receive();
+ assertTrue(received.getAction().equals("MESSAGE"));
+
+ String ack = "ACK\n" + "message-id:" +
+ received.getHeaders().get("message-id") + "\n\n" + Stomp.NULL;
+ stompConnection.sendFrame(ack);
+
+ StompFrame error = stompConnection.receive();
+ assertTrue(error.getAction().equals("ERROR"));
+
+ String unsub = "UNSUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" +
+ "id:12345\n\n" + Stomp.NULL;
+ stompConnection.sendFrame(unsub);
+
+ String frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
+ stompConnection.sendFrame(frame);
+ }
+
+ @Test
+ public void testQueueBrowerSubscription() throws Exception {
+
+ final int MSG_COUNT = 10;
+
+ String connectFrame = "STOMP\n" +
+ "login:system\n" +
+ "passcode:manager\n" +
+ "accept-version:1.1\n" +
+ "host:localhost\n" +
+ "\n" + Stomp.NULL;
+
+ stompConnection.sendFrame(connectFrame);
+
+ String f = stompConnection.receiveFrame();
+ LOG.debug("Broker sent: " + f);
+
+ assertTrue(f.startsWith("CONNECTED"));
+
+ for(int i = 0; i < MSG_COUNT; ++i) {
+ String message = "SEND\n" + "destination:/queue/" + getQueueName() + "\n" +
+ "receipt:0\n" +
+ "\n" + "Hello World {" + i + "}" + Stomp.NULL;
+ stompConnection.sendFrame(message);
+ StompFrame repsonse = stompConnection.receive();
+ assertEquals("0", repsonse.getHeaders().get(Stomp.Headers.Response.RECEIPT_ID));
+ }
+
+ String subscribe = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" +
+ "id:12345\n" + "browser:true\n\n" + Stomp.NULL;
+ stompConnection.sendFrame(subscribe);
+
+ for(int i = 0; i < MSG_COUNT; ++i) {
+ StompFrame message = stompConnection.receive();
+ assertEquals(Stomp.Responses.MESSAGE, message.getAction());
+ assertEquals("12345", message.getHeaders().get(Stomp.Headers.Message.SUBSCRIPTION));
+ }
+
+ // We should now get a browse done message
+ StompFrame browseDone = stompConnection.receive();
+ LOG.debug("Browse Done: " + browseDone.toString());
+ assertEquals(Stomp.Responses.MESSAGE, browseDone.getAction());
+ assertEquals("12345", browseDone.getHeaders().get(Stomp.Headers.Message.SUBSCRIPTION));
+ assertEquals("end", browseDone.getHeaders().get(Stomp.Headers.Message.BROWSER));
+ assertTrue(browseDone.getHeaders().get(Stomp.Headers.Message.DESTINATION) != null);
+
+ String unsub = "UNSUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" +
+ "id:12345\n\n" + Stomp.NULL;
+ stompConnection.sendFrame(unsub);
+
+ Thread.sleep(2000);
+
+ subscribe = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" + "id:12345\n\n" + Stomp.NULL;
+ stompConnection.sendFrame(subscribe);
+
+ for(int i = 0; i < MSG_COUNT; ++i) {
+ StompFrame message = stompConnection.receive();
+ assertEquals(Stomp.Responses.MESSAGE, message.getAction());
+ assertEquals("12345", message.getHeaders().get(Stomp.Headers.Message.SUBSCRIPTION));
+ }
+
+ stompConnection.sendFrame(unsub);
+
+ String frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
+ stompConnection.sendFrame(frame);
+ }
+
+ @Test
+ public void testSendMessageWithStandardHeadersEncoded() throws Exception {
+
+ MessageConsumer consumer = session.createConsumer(queue);
+
+ String frame = "CONNECT\n" + "login:system\n" + "passcode:manager\n" +
+ "accept-version:1.1" + "\n\n" + Stomp.NULL;
+ stompConnection.sendFrame(frame);
+
+ frame = stompConnection.receiveFrame();
+ assertTrue(frame.startsWith("CONNECTED"));
+
+ frame = "SEND\n" + "correlation-id:c1\\:\\n\\23\n" + "priority:3\n" + "type:t34:5\n" + "JMSXGroupID:abc\n" + "foo:a\\bc\n" + "bar:123\n" + "destination:/queue/" + getQueueName() + "\n\n" + "Hello World"
+ + Stomp.NULL;
+
+ stompConnection.sendFrame(frame);
+
+ TextMessage message = (TextMessage)consumer.receive(2500);
+ assertNotNull(message);
+ assertEquals("Hello World", message.getText());
+ assertEquals("JMSCorrelationID", "c1\\:\n\\23", message.getJMSCorrelationID());
+ assertEquals("getJMSType", "t34:5", message.getJMSType());
+ assertEquals("getJMSPriority", 3, message.getJMSPriority());
+ assertEquals("foo", "a\\bc", message.getStringProperty("foo"));
+ assertEquals("bar", "123", message.getStringProperty("bar"));
+
+ assertEquals("JMSXGroupID", "abc", message.getStringProperty("JMSXGroupID"));
+ ActiveMQTextMessage amqMessage = (ActiveMQTextMessage)message;
+ assertEquals("GroupID", "abc", amqMessage.getGroupID());
+
+ frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
+ stompConnection.sendFrame(frame);
+ }
+
+ @Test
+ public void testSendMessageWithRepeatedEntries() throws Exception {
+
+ MessageConsumer consumer = session.createConsumer(queue);
+
+ String frame = "CONNECT\n" + "login:system\n" + "passcode:manager\n" +
+ "accept-version:1.1" + "\n\n" + Stomp.NULL;
+ stompConnection.sendFrame(frame);
+
+ frame = stompConnection.receiveFrame();
+ assertTrue(frame.startsWith("CONNECTED"));
+
+ frame = "SEND\n" +
+ "value:newest" + "\n" +
+ "value:older" + "\n" +
+ "value:oldest" + "\n" +
+ "destination:/queue/" + getQueueName() +
+ "\n\n" + "Hello World" + Stomp.NULL;
+
+ stompConnection.sendFrame(frame);
+
+ TextMessage message = (TextMessage)consumer.receive(2500);
+ assertNotNull(message);
+ assertEquals("Hello World", message.getText());
+ assertEquals("newest", message.getStringProperty("value"));
+
+ frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
+ stompConnection.sendFrame(frame);
+ }
+
+ @Test
+ public void testSubscribeWithMessageSentWithEncodedProperties() throws Exception {
+
+ String frame = "CONNECT\n" + "login:system\n" + "passcode:manager\n" + "accept-version:1.1" + "\n\n" + Stomp.NULL;
+ stompConnection.sendFrame(frame);
+
+ frame = stompConnection.receiveFrame();
+ assertTrue(frame.startsWith("CONNECTED"));
+
+ frame = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" + "id:12345\n" + "ack:auto\n\n" + Stomp.NULL;
+ stompConnection.sendFrame(frame);
+
+ MessageProducer producer = session.createProducer(queue);
+ TextMessage message = session.createTextMessage("Hello World");
+ message.setStringProperty("s", "\\value:");
+ producer.send(message);
+
+ frame = stompConnection.receiveFrame();
+ assertTrue("" + frame, frame.startsWith("MESSAGE"));
+
+ int start = frame.indexOf("\ns:") + 3;
+ final String expectedEncoded = "\\\\value\\c";
+ final String headerVal = frame.substring(start, start + expectedEncoded.length());
+ assertEquals("" + frame, expectedEncoded, headerVal);
+
+ frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
+ stompConnection.sendFrame(frame);
+ }
+
+ @Test
+ public void testNackMessage() throws Exception {
+
+ String connectFrame = "STOMP\n" +
+ "login:system\n" +
+ "passcode:manager\n" +
+ "accept-version:1.1\n" +
+ "host:localhost\n" +
+ "\n" + Stomp.NULL;
+ stompConnection.sendFrame(connectFrame);
+
+ String f = stompConnection.receiveFrame();
+ LOG.debug("Broker sent: " + f);
+
+ assertTrue(f.startsWith("CONNECTED"));
+
+ String message = "SEND\n" + "destination:/queue/" + getQueueName() + "\npersistent:true\n\n" + "Hello World" + Stomp.NULL;
+
+ stompConnection.sendFrame(message);
+
+ String frame = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" +
+ "id:12345\n" + "ack:client\n\n" + Stomp.NULL;
+ stompConnection.sendFrame(frame);
+
+ StompFrame received = stompConnection.receive();
+ assertTrue(received.getAction().equals("MESSAGE"));
+
+ // nack it
+ frame = "NACK\n" + "subscription:12345\n" + "message-id:" +
+ received.getHeaders().get("message-id") + "\n\n" + Stomp.NULL;
+ stompConnection.sendFrame(frame);
+
+ frame = "UNSUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" +
+ "id:12345\n\n" + Stomp.NULL;
+ stompConnection.sendFrame(frame);
+
+ //consume it from dlq
+
+ frame = "SUBSCRIBE\n" + "destination:/queue/ActiveMQ.DLQ\n" +
+ "id:12345\n" + "ack:client\n\n" + Stomp.NULL;
+ stompConnection.sendFrame(frame);
+ StompFrame receivedDLQ = stompConnection.receive(200);
+ assertEquals(receivedDLQ.getHeaders().get("message-id"), received.getHeaders().get("message-id"));
+
+ frame = "ACK\n" + "subscription:12345\n" + "message-id:" +
+ received.getHeaders().get("message-id") + "\n\n" + Stomp.NULL;
+ stompConnection.sendFrame(frame);
+
+ frame = "UNSUBSCRIBE\n" + "destination:/queue/ActiveMQ.DLQ\n" +
+ "id:12345\n\n" + Stomp.NULL;
+ stompConnection.sendFrame(frame);
+
+ frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
+ stompConnection.sendFrame(frame);
+ }
+
+ @Test
+ public void testHeaderValuesAreNotWSTrimmed() throws Exception {
+ stompConnection.setVersion(Stomp.V1_1);
+ String connectFrame = "STOMP\n" +
+ "login:system\n" +
+ "passcode:manager\n" +
+ "accept-version:1.1\n" +
+ "host:localhost\n" +
+ "\n" + Stomp.NULL;
+ stompConnection.sendFrame(connectFrame);
+
+ String f = stompConnection.receiveFrame();
+ LOG.debug("Broker sent: " + f);
+
+ assertTrue(f.startsWith("CONNECTED"));
+
+ String message = "SEND\n" + "destination:/queue/" + getQueueName() +
+ "\ntest1: value" +
+ "\ntest2:value " +
+ "\ntest3: value " +
+ "\n\n" + "Hello World" + Stomp.NULL;
+
+ stompConnection.sendFrame(message);
+
+ String frame = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" +
+ "id:12345\n" + "ack:auto\n\n" + Stomp.NULL;
+ stompConnection.sendFrame(frame);
+
+ StompFrame received = stompConnection.receive();
+ assertTrue(received.getAction().equals("MESSAGE"));
+
+ assertEquals(" value", received.getHeaders().get("test1"));
+ assertEquals("value ", received.getHeaders().get("test2"));
+ assertEquals(" value ", received.getHeaders().get("test3"));
+
+ frame = "UNSUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" +
+ "id:12345\n\n" + Stomp.NULL;
+ stompConnection.sendFrame(frame);
+
+ frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
+ stompConnection.sendFrame(frame);
+ }
+
+ @Test
+ public void testDurableSubAndUnSubOnTwoTopics() throws Exception {
+ stompConnection.setVersion(Stomp.V1_1);
+
+ String domain = "org.apache.activemq";
+ ObjectName brokerName = new ObjectName(domain + ":Type=Broker,BrokerName=localhost");
+
+ BrokerViewMBean view = (BrokerViewMBean)brokerService.getManagementContext().newProxyInstance(brokerName, BrokerViewMBean.class, true);
+
+ String connectFrame = "STOMP\n" +
+ "login:system\n" + "passcode:manager\n" + "accept-version:1.1\n" +
+ "host:localhost\n" + "client-id:test\n" + "\n" + Stomp.NULL;
+ stompConnection.sendFrame(connectFrame);
+
+ String frame = stompConnection.receiveFrame();
+ LOG.debug("Broker sent: " + frame);
+
+ assertTrue(frame.startsWith("CONNECTED"));
+ assertEquals(view.getDurableTopicSubscribers().length, 0);
+
+ // subscribe to first destination durably
+ frame = "SUBSCRIBE\n" +
+ "destination:/topic/" + getQueueName() + "1" + "\n" +
+ "ack:auto\n" + "receipt:1\n" + "id:durablesub-1\n" +
+ "activemq.subscriptionName:test1\n\n" + Stomp.NULL;
+ stompConnection.sendFrame(frame);
+
+ StompFrame receipt = stompConnection.receive();
+ LOG.debug("Broker sent: " + receipt);
+ assertTrue(receipt.getAction().startsWith("RECEIPT"));
+ assertEquals("1", receipt.getHeaders().get("receipt-id"));
+ assertEquals(view.getDurableTopicSubscribers().length, 1);
+
+ // subscribe to second destination durably
+ frame = "SUBSCRIBE\n" +
+ "destination:/topic/" + getQueueName() + "2" + "\n" +
+ "ack:auto\n" + "receipt:2\n" + "id:durablesub-2\n" +
+ "activemq.subscriptionName:test2\n\n" + Stomp.NULL;
+ stompConnection.sendFrame(frame);
+
+ receipt = stompConnection.receive();
+ LOG.debug("Broker sent: " + receipt);
+ assertTrue(receipt.getAction().startsWith("RECEIPT"));
+ assertEquals("2", receipt.getHeaders().get("receipt-id"));
+ assertEquals(view.getDurableTopicSubscribers().length, 2);
+
+ frame = "DISCONNECT\nclient-id:test\n\n" + Stomp.NULL;
+ stompConnection.sendFrame(frame);
+ try {
+ Thread.sleep(400);
+ } catch (InterruptedException e){}
+
+ // reconnect and send some messages to the offline subscribers and then try to get
+ // them after subscribing again.
+ stompConnect();
+ stompConnection.sendFrame(connectFrame);
+ frame = stompConnection.receiveFrame();
+ LOG.debug("Broker sent: " + frame);
+ assertTrue(frame.startsWith("CONNECTED"));
+ assertEquals(view.getDurableTopicSubscribers().length, 0);
+ assertEquals(view.getInactiveDurableTopicSubscribers().length, 2);
+
+ // unsubscribe from topic 1
+ frame = "UNSUBSCRIBE\n" + "destination:/topic/" + getQueueName() + "1\n" +
+ "id:durablesub-1\n" + "receipt:3\n" +
+ "activemq.subscriptionName:test1\n\n" + Stomp.NULL;
+ stompConnection.sendFrame(frame);
+ receipt = stompConnection.receive();
+ LOG.debug("Broker sent: " + frame);
+ assertTrue(receipt.getAction().startsWith("RECEIPT"));
+ assertEquals("3", receipt.getHeaders().get("receipt-id"));
+
+ assertEquals(view.getInactiveDurableTopicSubscribers().length, 1);
+
+ // unsubscribe from topic 2
+ frame = "UNSUBSCRIBE\n" + "destination:/topic/" + getQueueName() + "2\n" +
+ "id:durablesub-2\n" + "receipt:4\n" +
+ "activemq.subscriptionName:test2\n\n" + Stomp.NULL;
+ stompConnection.sendFrame(frame);
+ receipt = stompConnection.receive();
+ LOG.debug("Broker sent: " + frame);
+ assertTrue(receipt.getAction().startsWith("RECEIPT"));
+ assertEquals("4", receipt.getHeaders().get("receipt-id"));
+
+ assertEquals(view.getInactiveDurableTopicSubscribers().length, 0);
+
+ frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
+ stompConnection.sendFrame(frame);
+ }
+
+ @Test
+ public void testMultipleDurableSubsWithOfflineMessages() throws Exception {
+ stompConnection.setVersion(Stomp.V1_1);
+
+ String domain = "org.apache.activemq";
+ ObjectName brokerName = new ObjectName(domain + ":Type=Broker,BrokerName=localhost");
+
+ BrokerViewMBean view = (BrokerViewMBean)brokerService.getManagementContext().newProxyInstance(brokerName, BrokerViewMBean.class, true);
+
+ String connectFrame = "STOMP\n" + "login:system\n" + "passcode:manager\n" +
+ "accept-version:1.1\n" + "host:localhost\n" + "client-id:test\n" + "\n" + Stomp.NULL;
+ stompConnection.sendFrame(connectFrame);
+
+ String frame = stompConnection.receiveFrame();
+ LOG.debug("Broker sent: " + frame);
+
+ assertTrue(frame.startsWith("CONNECTED"));
+ assertEquals(view.getDurableTopicSubscribers().length, 0);
+
+ // subscribe to first destination durably
+ frame = "SUBSCRIBE\n" +
+ "destination:/topic/" + getQueueName() + "1" + "\n" +
+ "ack:auto\n" + "receipt:1\n" + "id:durablesub-1\n" +
+ "activemq.subscriptionName:test1\n\n" + Stomp.NULL;
+ stompConnection.sendFrame(frame);
+
+ StompFrame receipt = stompConnection.receive();
+ LOG.debug("Broker sent: " + receipt);
+ assertTrue(receipt.getAction().startsWith("RECEIPT"));
+ assertEquals("1", receipt.getHeaders().get("receipt-id"));
+ assertEquals(view.getDurableTopicSubscribers().length, 1);
+
+ // subscribe to second destination durably
+ frame = "SUBSCRIBE\n" +
+ "destination:/topic/" + getQueueName() + "2" + "\n" +
+ "ack:auto\n" + "receipt:2\n" + "id:durablesub-2\n" +
+ "activemq.subscriptionName:test2\n\n" + Stomp.NULL;
+ stompConnection.sendFrame(frame);
+
+ receipt = stompConnection.receive();
+ LOG.debug("Broker sent: " + receipt);
+ assertTrue(receipt.getAction().startsWith("RECEIPT"));
+ assertEquals("2", receipt.getHeaders().get("receipt-id"));
+ assertEquals(view.getDurableTopicSubscribers().length, 2);
+
+ frame = "DISCONNECT\nclient-id:test\n\n" + Stomp.NULL;
+ stompConnection.sendFrame(frame);
+ try {
+ Thread.sleep(400);
+ } catch (InterruptedException e){}
+
+ // reconnect and send some messages to the offline subscribers and then try to get
+ // them after subscribing again.
+ stompConnect();
+ stompConnection.sendFrame(connectFrame);
+ frame = stompConnection.receiveFrame();
+ LOG.debug("Broker sent: " + frame);
+ assertTrue(frame.startsWith("CONNECTED"));
+ assertEquals(view.getDurableTopicSubscribers().length, 0);
+ assertEquals(view.getInactiveDurableTopicSubscribers().length, 2);
+
+ frame = "SEND\n" + "destination:/topic/" + getQueueName() + "1\n" +
+ "receipt:10\n" + "\n" + "Hello World 1" + Stomp.NULL;
+ stompConnection.sendFrame(frame);
+ receipt = stompConnection.receive();
+ assertEquals("10", receipt.getHeaders().get(Stomp.Headers.Response.RECEIPT_ID));
+
+ frame = "SEND\n" + "destination:/topic/" + getQueueName() + "2\n" +
+ "receipt:11\n" + "\n" + "Hello World 2" + Stomp.NULL;
+ stompConnection.sendFrame(frame);
+ receipt = stompConnection.receive();
+ assertEquals("11", receipt.getHeaders().get(Stomp.Headers.Response.RECEIPT_ID));
+
+ // subscribe to first destination durably
+ frame = "SUBSCRIBE\n" +
+ "destination:/topic/" + getQueueName() + "1" + "\n" +
+ "ack:auto\n" + "receipt:3\n" + "id:durablesub-1\n" +
+ "activemq.subscriptionName:test1\n\n" + Stomp.NULL;
+ stompConnection.sendFrame(frame);
+
+ receipt = stompConnection.receive();
+ LOG.debug("Broker sent: " + receipt);
+ assertTrue(receipt.getAction().startsWith("RECEIPT"));
+ assertEquals("3", receipt.getHeaders().get("receipt-id"));
+ assertEquals(view.getDurableTopicSubscribers().length, 1);
+
+ StompFrame message = stompConnection.receive();
+ assertEquals(Stomp.Responses.MESSAGE, message.getAction());
+ assertEquals("durablesub-1", message.getHeaders().get(Stomp.Headers.Message.SUBSCRIPTION));
+
+ assertEquals(view.getDurableTopicSubscribers().length, 1);
+ assertEquals(view.getInactiveDurableTopicSubscribers().length, 1);
+
+ // subscribe to second destination durably
+ frame = "SUBSCRIBE\n" +
+ "destination:/topic/" + getQueueName() + "2" + "\n" +
+ "ack:auto\n" + "receipt:4\n" + "id:durablesub-2\n" +
+ "activemq.subscriptionName:test2\n\n" + Stomp.NULL;
+ stompConnection.sendFrame(frame);
+
+ receipt = stompConnection.receive();
+ LOG.debug("Broker sent: " + receipt);
+ assertTrue(receipt.getAction().startsWith("RECEIPT"));
+ assertEquals("4", receipt.getHeaders().get("receipt-id"));
+ assertEquals(view.getDurableTopicSubscribers().length, 2);
+
+ message = stompConnection.receive();
+ assertEquals(Stomp.Responses.MESSAGE, message.getAction());
+ assertEquals("durablesub-2", message.getHeaders().get(Stomp.Headers.Message.SUBSCRIPTION));
+
+ assertEquals(view.getDurableTopicSubscribers().length, 2);
+ assertEquals(view.getInactiveDurableTopicSubscribers().length, 0);
+
+ frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
+ stompConnection.sendFrame(frame);
+ }
+}
Propchange: activemq/trunk/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/Stomp11Test.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/trunk/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/Stomp12NIOSSLTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/Stomp12NIOSSLTest.java?rev=1408161&view=auto
==============================================================================
--- activemq/trunk/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/Stomp12NIOSSLTest.java (added)
+++ activemq/trunk/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/Stomp12NIOSSLTest.java Mon Nov 12 00:15:50 2012
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.stomp;
+
+import java.io.IOException;
+import java.net.Socket;
+
+import javax.net.SocketFactory;
+import javax.net.ssl.SSLSocketFactory;
+
+import org.apache.activemq.broker.TransportConnector;
+
+public class Stomp12NIOSSLTest extends Stomp12Test {
+
+ @Override
+ public void setUp() throws Exception {
+ System.setProperty("javax.net.ssl.trustStore", "src/test/resources/client.keystore");
+ System.setProperty("javax.net.ssl.trustStorePassword", "password");
+ System.setProperty("javax.net.ssl.trustStoreType", "jks");
+ System.setProperty("javax.net.ssl.keyStore", "src/test/resources/server.keystore");
+ System.setProperty("javax.net.ssl.keyStorePassword", "password");
+ System.setProperty("javax.net.ssl.keyStoreType", "jks");
+ super.setUp();
+ }
+
+ @Override
+ protected void addStompConnector() throws Exception {
+ TransportConnector connector = brokerService.addConnector("stomp+nio+ssl://0.0.0.0:"+nioSslPort);
+ nioSslPort = connector.getConnectUri().getPort();
+ }
+
+ @Override
+ protected Socket createSocket() throws IOException {
+ SocketFactory factory = SSLSocketFactory.getDefault();
+ return factory.createSocket("127.0.0.1", this.nioSslPort);
+ }
+}
Propchange: activemq/trunk/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/Stomp12NIOSSLTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/trunk/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/Stomp12NIOTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/Stomp12NIOTest.java?rev=1408161&view=auto
==============================================================================
--- activemq/trunk/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/Stomp12NIOTest.java (added)
+++ activemq/trunk/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/Stomp12NIOTest.java Mon Nov 12 00:15:50 2012
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.stomp;
+
+import java.io.IOException;
+import java.net.Socket;
+
+import org.apache.activemq.broker.TransportConnector;
+
+public class Stomp12NIOTest extends Stomp12Test {
+
+ @Override
+ protected void addStompConnector() throws Exception {
+ TransportConnector connector = brokerService.addConnector("stomp+nio://0.0.0.0:"+nioPort);
+ nioPort = connector.getConnectUri().getPort();
+ }
+
+ @Override
+ protected Socket createSocket() throws IOException {
+ return new Socket("127.0.0.1", this.nioPort);
+ }
+}
Propchange: activemq/trunk/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/Stomp12NIOTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/trunk/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/Stomp12SslAuthTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/Stomp12SslAuthTest.java?rev=1408161&view=auto
==============================================================================
--- activemq/trunk/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/Stomp12SslAuthTest.java (added)
+++ activemq/trunk/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/Stomp12SslAuthTest.java Mon Nov 12 00:15:50 2012
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.stomp;
+
+import java.io.IOException;
+import java.net.Socket;
+
+import javax.net.SocketFactory;
+import javax.net.ssl.SSLSocketFactory;
+
+import org.apache.activemq.broker.TransportConnector;
+
+public class Stomp12SslAuthTest extends Stomp12Test {
+
+ @Override
+ public void setUp() throws Exception {
+ System.setProperty("javax.net.ssl.trustStore", "src/test/resources/client.keystore");
+ System.setProperty("javax.net.ssl.trustStorePassword", "password");
+ System.setProperty("javax.net.ssl.trustStoreType", "jks");
+ System.setProperty("javax.net.ssl.keyStore", "src/test/resources/server.keystore");
+ System.setProperty("javax.net.ssl.keyStorePassword", "password");
+ System.setProperty("javax.net.ssl.keyStoreType", "jks");
+ //System.setProperty("javax.net.debug","ssl,handshake");
+ super.setUp();
+ }
+
+ @Override
+ protected void addOpenWireConnector() throws Exception {
+ TransportConnector connector = brokerService.addConnector(
+ "ssl://0.0.0.0:0?needClientAuth=true");
+ jmsUri = connector.getPublishableConnectString();
+ }
+
+ @Override
+ protected void addStompConnector() throws Exception {
+ TransportConnector connector = brokerService.addConnector(
+ "stomp+ssl://0.0.0.0:"+port+"?needClientAuth=true");
+ sslPort = connector.getConnectUri().getPort();
+ }
+
+ @Override
+ protected Socket createSocket() throws IOException {
+ SocketFactory factory = SSLSocketFactory.getDefault();
+ return factory.createSocket("127.0.0.1", this.sslPort);
+ }
+}
Propchange: activemq/trunk/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/Stomp12SslAuthTest.java
------------------------------------------------------------------------------
svn:eol-style = native