You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2012/11/12 01:15:56 UTC

svn commit: r1408161 [1/4] - in /activemq/trunk: activemq-core/src/test/java/org/apache/activemq/transport/stomp/ activemq-core/src/test/resources/org/apache/activemq/transport/stomp/ activemq-stomp/ activemq-stomp/src/test/ activemq-stomp/src/test/jav...

Author: tabish
Date: Mon Nov 12 00:15:50 2012
New Revision: 1408161

URL: http://svn.apache.org/viewvc?rev=1408161&view=rev
Log:
Move STOMP unit tests into the STOMP module and clean them up:

All tests now use a common StompTestSupport base class to remove cut and paste test code.
All tests are now using JUnit 4 tests.
All tests are run with AutoFailSupport on so they won't hang.
All tests use dynamic port assignment so they shouldn't clash with others.

Cleaned up the pom file

Added:
    activemq/trunk/activemq-stomp/src/test/
    activemq/trunk/activemq-stomp/src/test/java/
    activemq/trunk/activemq-stomp/src/test/java/org/
    activemq/trunk/activemq-stomp/src/test/java/org/apache/
    activemq/trunk/activemq-stomp/src/test/java/org/apache/activemq/
    activemq/trunk/activemq-stomp/src/test/java/org/apache/activemq/transport/
    activemq/trunk/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/
    activemq/trunk/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/ConnectTest.java   (with props)
    activemq/trunk/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/SamplePojo.java   (with props)
    activemq/trunk/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/Stomp11NIOSSLTest.java   (with props)
    activemq/trunk/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/Stomp11NIOTest.java   (with props)
    activemq/trunk/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/Stomp11SslAuthTest.java   (with props)
    activemq/trunk/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/Stomp11Test.java   (with props)
    activemq/trunk/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/Stomp12NIOSSLTest.java   (with props)
    activemq/trunk/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/Stomp12NIOTest.java   (with props)
    activemq/trunk/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/Stomp12SslAuthTest.java   (with props)
    activemq/trunk/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/Stomp12Test.java   (with props)
    activemq/trunk/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/StompAdvisoryTest.java   (with props)
    activemq/trunk/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/StompFrameTest.java   (with props)
    activemq/trunk/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/StompLoadTest.java   (with props)
    activemq/trunk/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/StompMissingMessageTest.java   (with props)
    activemq/trunk/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/StompNIOLoadTest.java   (with props)
    activemq/trunk/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/StompNIOSSLLoadTest.java   (with props)
    activemq/trunk/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/StompNIOSSLTest.java   (with props)
    activemq/trunk/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/StompNIOTest.java   (with props)
    activemq/trunk/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/StompSSLLoadTest.java   (with props)
    activemq/trunk/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/StompSslAuthTest.java   (with props)
    activemq/trunk/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/StompSslTest.java   (with props)
    activemq/trunk/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/StompSubscriptionRemoveTest.java   (with props)
    activemq/trunk/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/StompTelnetTest.java   (with props)
    activemq/trunk/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/StompTest.java   (with props)
    activemq/trunk/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/StompTestSupport.java   (with props)
    activemq/trunk/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/StompVirtualTopicTest.java   (with props)
    activemq/trunk/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/util/
    activemq/trunk/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/util/ResourceLoadingSslContext.java   (with props)
    activemq/trunk/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/util/XStreamBrokerContext.java   (with props)
    activemq/trunk/activemq-stomp/src/test/resources/
    activemq/trunk/activemq-stomp/src/test/resources/client.keystore   (with props)
    activemq/trunk/activemq-stomp/src/test/resources/log4j.properties   (with props)
    activemq/trunk/activemq-stomp/src/test/resources/login.config   (with props)
    activemq/trunk/activemq-stomp/src/test/resources/org/
    activemq/trunk/activemq-stomp/src/test/resources/org/apache/
    activemq/trunk/activemq-stomp/src/test/resources/org/apache/activemq/
    activemq/trunk/activemq-stomp/src/test/resources/org/apache/activemq/security/
    activemq/trunk/activemq-stomp/src/test/resources/org/apache/activemq/security/groups.properties   (with props)
    activemq/trunk/activemq-stomp/src/test/resources/org/apache/activemq/security/users.properties   (with props)
    activemq/trunk/activemq-stomp/src/test/resources/server.keystore   (with props)
Removed:
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/
    activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/transport/stomp/
Modified:
    activemq/trunk/activemq-stomp/pom.xml

Modified: activemq/trunk/activemq-stomp/pom.xml
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-stomp/pom.xml?rev=1408161&r1=1408160&r2=1408161&view=diff
==============================================================================
--- activemq/trunk/activemq-stomp/pom.xml (original)
+++ activemq/trunk/activemq-stomp/pom.xml Mon Nov 12 00:15:50 2012
@@ -39,23 +39,8 @@
     <!-- Required Dependencies -->
     <!-- =============================== -->
     <dependency>
-      <groupId>org.apache.activemq</groupId>
-      <artifactId>activemq-broker</artifactId>
-    </dependency>
-
-    <dependency>
       <groupId>${project.groupId}</groupId>
-      <artifactId>activeio-core</artifactId>
-      <optional>true</optional>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.activemq.protobuf</groupId>
-      <artifactId>activemq-protobuf</artifactId>
-      <optional>false</optional>
-    </dependency>
-    <dependency>
-      <groupId>org.fusesource.mqtt-client</groupId>
-      <artifactId>mqtt-client</artifactId>
+      <artifactId>activemq-broker</artifactId>
     </dependency>
 
     <!-- =============================== -->
@@ -63,74 +48,36 @@
     <!-- =============================== -->
 
     <dependency>
-      <groupId>org.osgi</groupId>
-      <artifactId>org.osgi.core</artifactId>
-      <scope>provided</scope>
-      <optional>true</optional>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.activemq</groupId>
-      <artifactId>activemq-jaas</artifactId>
-      <optional>true</optional>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.geronimo.specs</groupId>
-      <artifactId>geronimo-jta_1.0.1B_spec</artifactId>
-      <optional>true</optional>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.geronimo.specs</groupId>
-      <artifactId>geronimo-j2ee-management_1.1_spec</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.geronimo.specs</groupId>
-      <artifactId>geronimo-annotation_1.0_spec</artifactId>
-      <optional>true</optional>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.geronimo.specs</groupId>
-      <artifactId>geronimo-jacc_1.1_spec</artifactId>
-      <optional>true</optional>
-    </dependency>
-    <dependency>
       <groupId>com.thoughtworks.xstream</groupId>
       <artifactId>xstream</artifactId>
       <optional>true</optional>
     </dependency>
     <dependency>
-        <groupId>org.codehaus.jettison</groupId>
-        <artifactId>jettison</artifactId>
-        <optional>true</optional>
-    </dependency>
-
-    <!-- for XML parsing -->
-    <dependency>
-      <groupId>org.apache.xbean</groupId>
-      <artifactId>xbean-spring</artifactId>
+      <groupId>org.codehaus.jettison</groupId>
+      <artifactId>jettison</artifactId>
       <optional>true</optional>
     </dependency>
+
+    <!-- =============================== -->
+    <!-- Testing Dependencies            -->
+    <!-- =============================== -->
+
     <dependency>
-      <groupId>org.springframework</groupId>
-      <artifactId>spring-context</artifactId>
-    </dependency>
-    <dependency>
-      <groupId>org.apache.derby</groupId>
-      <artifactId>derby</artifactId>
-      <optional>true</optional>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>activemq-kahadb-store</artifactId>
+      <scope>test</scope>
     </dependency>
     <dependency>
-      <groupId>xalan</groupId>
-      <artifactId>xalan</artifactId>
-      <optional>true</optional>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>activemq-broker</artifactId>
+      <type>test-jar</type>
+      <scope>test</scope>
     </dependency>
     <dependency>
-      <groupId>commons-net</groupId>
-      <artifactId>commons-net</artifactId>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>activemq-jaas</artifactId>
+      <scope>test</scope>
     </dependency>
-
-    <!-- =============================== -->
-    <!-- Testing Dependencies            -->
-    <!-- =============================== -->
     <dependency>
       <groupId>junit</groupId>
       <artifactId>junit</artifactId>
@@ -165,36 +112,6 @@
       </resource>
     </resources>
 
-    <pluginManagement>
-      <plugins>
-        <!--This plugin's configuration is used to store Eclipse m2e settings only.
-            It has no influence on the Maven build itself.-->
-        <plugin>
-          <groupId>org.eclipse.m2e</groupId>
-          <artifactId>lifecycle-mapping</artifactId>
-          <version>1.0.0</version>
-          <configuration>
-            <lifecycleMappingMetadata>
-              <pluginExecutions>
-                <pluginExecution>
-                  <pluginExecutionFilter>
-                    <groupId>org.apache.activemq.protobuf</groupId>
-                    <artifactId>activemq-protobuf</artifactId>
-                    <versionRange>[0.0.0,)</versionRange>
-                    <goals>
-                      <goal>compile</goal>
-                    </goals>
-                  </pluginExecutionFilter>
-                  <action>
-                    <ignore />
-                  </action>
-                </pluginExecution>
-              </pluginExecutions>
-            </lifecycleMappingMetadata>
-          </configuration>
-        </plugin>
-      </plugins>
-    </pluginManagement>
     <plugins>
       <plugin>
         <artifactId>maven-surefire-plugin</artifactId>
@@ -202,8 +119,8 @@
           <forkMode>always</forkMode>
           <argLine>${surefire.argLine}</argLine>
           <runOrder>alphabetical</runOrder>
-
-           <systemProperties>
+          <failIfNoTests>false</failIfNoTests>
+          <systemProperties>
             <property>
               <name>org.apache.activemq.default.directory.prefix</name>
               <value>target/</value>
@@ -219,23 +136,12 @@
               <value>file:target/test-classes/log4j.properties</value>
             </property>
             -->
-           </systemProperties>
-           <includes>
-             <include>**/*Test.*</include>
-           </includes>
+          </systemProperties>
+          <includes>
+            <include>**/*Test.*</include>
+          </includes>
         </configuration>
       </plugin>
-      <plugin>
-        <groupId>org.apache.activemq.protobuf</groupId>
-        <artifactId>activemq-protobuf</artifactId>
-         <executions>
-          <execution>
-            <goals>
-              <goal>compile</goal>
-            </goals>
-          </execution>
-        </executions>
-      </plugin>
     </plugins>
   </build>
   <profiles>

Added: activemq/trunk/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/ConnectTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/ConnectTest.java?rev=1408161&view=auto
==============================================================================
--- activemq/trunk/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/ConnectTest.java (added)
+++ activemq/trunk/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/ConnectTest.java Mon Nov 12 00:15:50 2012
@@ -0,0 +1,178 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.stomp;
+
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.util.Vector;
+import javax.net.ServerSocketFactory;
+import org.apache.activemq.broker.BrokerPlugin;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.security.JaasDualAuthenticationPlugin;
+import org.apache.activemq.util.Wait;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+import static junit.framework.Assert.assertTrue;
+
+// https://issues.apache.org/jira/browse/AMQ-3393
+public class ConnectTest {
+    private static final Logger LOG = LoggerFactory.getLogger(ConnectTest.class);
+    BrokerService brokerService;
+    Vector<Throwable> exceptions = new Vector<Throwable>();
+
+    @Before
+    public void startBroker() throws Exception {
+        exceptions.clear();
+        brokerService = new BrokerService();
+        brokerService.setPersistent(false);
+        brokerService.setAdvisorySupport(false);
+    }
+
+    @After
+    public void stopBroker() throws Exception {
+        if (brokerService != null) {
+            brokerService.stop();
+        }
+    }
+
+    @Test
+    public void testStompConnectLeak() throws Exception {
+
+        brokerService.addConnector("stomp://0.0.0.0:0?transport.soLinger=0");
+        brokerService.start();
+
+        Thread t1 = new Thread() {
+            StompConnection connection = new StompConnection();
+
+            public void run() {
+                try {
+                    connection.open("localhost", brokerService.getTransportConnectors().get(0).getConnectUri().getPort());
+                    connection.connect("system", "manager");
+                    connection.disconnect();
+                } catch (Exception ex) {
+                    LOG.error("unexpected exception on connect/disconnect", ex);
+                    exceptions.add(ex);
+                }
+            }
+        };
+
+        int i = 0;
+        long done = System.currentTimeMillis() + (15 * 1000);
+        while (System.currentTimeMillis() < done) {
+            t1.run();
+            if (++i % 5000 == 0) {
+                LOG.info("connection count on stomp connector:" + brokerService.getTransportConnectors().get(0).connectionCount());
+            }
+        }
+
+        assertTrue("no dangling connections", Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                return 0 == brokerService.getTransportConnectors().get(0).connectionCount();
+            }
+        }));
+        assertTrue("no exceptions", exceptions.isEmpty());
+    }
+
+    @Test
+    public void testJaasDualStopWithOpenConnection() throws Exception {
+
+        brokerService.setPlugins(new BrokerPlugin[]{new JaasDualAuthenticationPlugin()});
+        brokerService.addConnector("stomp://0.0.0.0:0?transport.closeAsync=false");
+        brokerService.start();
+
+        final int listenPort = brokerService.getTransportConnectors().get(0).getConnectUri().getPort();
+        Thread t1 = new Thread() {
+            StompConnection connection = new StompConnection();
+
+            public void run() {
+                try {
+                    connection.open("localhost", listenPort);
+                    connection.connect("system", "manager");
+                } catch (Exception ex) {
+                    LOG.error("unexpected exception on connect/disconnect", ex);
+                    exceptions.add(ex);
+                }
+            }
+        };
+
+        t1.run();
+
+        assertTrue("one connection", Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                return 1 == brokerService.getTransportConnectors().get(0).connectionCount();
+            }
+        }));
+
+        brokerService.stop();
+
+        // server socket should be available after stop
+        ServerSocket socket = ServerSocketFactory.getDefault().createServerSocket();
+        socket.setReuseAddress(true);
+        InetAddress address = InetAddress.getLocalHost();
+        socket.bind(new InetSocketAddress(address, listenPort));
+        LOG.info("bound address: " + socket);
+        socket.close();
+        assertTrue("no exceptions", exceptions.isEmpty());
+    }
+
+    @Test
+    public void testInactivityMonitor() throws Exception {
+
+        brokerService.addConnector("stomp://0.0.0.0:0?transport.defaultHeartBeat=5000,0&transport.useKeepAlive=false");
+        brokerService.start();
+
+        Thread t1 = new Thread() {
+            StompConnection connection = new StompConnection();
+
+            public void run() {
+                try {
+                    connection.open("localhost",  brokerService.getTransportConnectors().get(0).getConnectUri().getPort());
+                    connection.connect("system", "manager");
+                } catch (Exception ex) {
+                    LOG.error("unexpected exception on connect/disconnect", ex);
+                    exceptions.add(ex);
+                }
+            }
+        };
+
+        t1.run();
+
+        assertTrue("one connection", Wait.waitFor(new Wait.Condition() {
+                 @Override
+                 public boolean isSatisified() throws Exception {
+                     return 1 == brokerService.getTransportConnectors().get(0).connectionCount();
+                 }
+             }));
+
+        // and it should be closed due to inactivity
+        assertTrue("no dangling connections", Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                return 0 == brokerService.getTransportConnectors().get(0).connectionCount();
+            }
+        }));
+        assertTrue("no exceptions", exceptions.isEmpty());
+    }
+}
\ No newline at end of file

Propchange: activemq/trunk/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/ConnectTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/trunk/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/SamplePojo.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/SamplePojo.java?rev=1408161&view=auto
==============================================================================
--- activemq/trunk/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/SamplePojo.java (added)
+++ activemq/trunk/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/SamplePojo.java Mon Nov 12 00:15:50 2012
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.stomp;
+
+import java.io.Serializable;
+
+import com.thoughtworks.xstream.annotations.XStreamAlias;
+
+@XStreamAlias("pojo")
+public class SamplePojo implements Serializable {
+    private static final long serialVersionUID = 9118938642100015088L;
+
+    @XStreamAlias("name")
+    private String name;
+    @XStreamAlias("city")
+    private String city;
+
+    public SamplePojo() {
+    }
+
+    public SamplePojo(String name, String city) {
+        this.name = name;
+        this.city = city;
+    }
+
+    public String getCity() {
+        return city;
+    }
+
+    public void setCity(String city) {
+        this.city = city;
+    }
+
+    public String getName() {
+        return name;
+    }
+
+    public void setName(String name) {
+        this.name = name;
+    }
+}

Propchange: activemq/trunk/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/SamplePojo.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/trunk/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/Stomp11NIOSSLTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/Stomp11NIOSSLTest.java?rev=1408161&view=auto
==============================================================================
--- activemq/trunk/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/Stomp11NIOSSLTest.java (added)
+++ activemq/trunk/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/Stomp11NIOSSLTest.java Mon Nov 12 00:15:50 2012
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.stomp;
+
+import java.io.IOException;
+import java.net.Socket;
+
+import javax.net.SocketFactory;
+import javax.net.ssl.SSLSocketFactory;
+
+import org.apache.activemq.broker.TransportConnector;
+
+public class Stomp11NIOSSLTest extends Stomp11Test {
+
+    @Override
+    public void setUp() throws Exception {
+        System.setProperty("javax.net.ssl.trustStore", "src/test/resources/client.keystore");
+        System.setProperty("javax.net.ssl.trustStorePassword", "password");
+        System.setProperty("javax.net.ssl.trustStoreType", "jks");
+        System.setProperty("javax.net.ssl.keyStore", "src/test/resources/server.keystore");
+        System.setProperty("javax.net.ssl.keyStorePassword", "password");
+        System.setProperty("javax.net.ssl.keyStoreType", "jks");
+        super.setUp();
+    }
+
+    @Override
+	protected void addStompConnector() throws Exception {
+        TransportConnector connector = brokerService.addConnector("stomp+nio+ssl://0.0.0.0:"+nioSslPort);
+        nioSslPort = connector.getConnectUri().getPort();
+    }
+
+    @Override
+    protected Socket createSocket() throws IOException {
+        SocketFactory factory = SSLSocketFactory.getDefault();
+        return factory.createSocket("127.0.0.1", this.nioSslPort);
+    }
+}

Propchange: activemq/trunk/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/Stomp11NIOSSLTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/trunk/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/Stomp11NIOTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/Stomp11NIOTest.java?rev=1408161&view=auto
==============================================================================
--- activemq/trunk/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/Stomp11NIOTest.java (added)
+++ activemq/trunk/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/Stomp11NIOTest.java Mon Nov 12 00:15:50 2012
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.stomp;
+
+import java.io.IOException;
+import java.net.Socket;
+
+import org.apache.activemq.broker.TransportConnector;
+
+public class Stomp11NIOTest extends Stomp11Test {
+
+    @Override
+	protected void addStompConnector() throws Exception {
+        TransportConnector connector = brokerService.addConnector("stomp+nio://0.0.0.0:"+nioPort);
+        nioPort = connector.getConnectUri().getPort();
+    }
+
+    @Override
+    protected Socket createSocket() throws IOException {
+        return new Socket("127.0.0.1", this.nioPort);
+    }
+}

Propchange: activemq/trunk/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/Stomp11NIOTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/trunk/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/Stomp11SslAuthTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/Stomp11SslAuthTest.java?rev=1408161&view=auto
==============================================================================
--- activemq/trunk/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/Stomp11SslAuthTest.java (added)
+++ activemq/trunk/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/Stomp11SslAuthTest.java Mon Nov 12 00:15:50 2012
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.stomp;
+
+import java.io.IOException;
+import java.net.Socket;
+
+import javax.net.SocketFactory;
+import javax.net.ssl.SSLSocketFactory;
+
+import org.apache.activemq.broker.TransportConnector;
+
+public class Stomp11SslAuthTest extends Stomp11Test {
+
+    @Override
+    public void setUp() throws Exception {
+        System.setProperty("javax.net.ssl.trustStore", "src/test/resources/client.keystore");
+        System.setProperty("javax.net.ssl.trustStorePassword", "password");
+        System.setProperty("javax.net.ssl.trustStoreType", "jks");
+        System.setProperty("javax.net.ssl.keyStore", "src/test/resources/server.keystore");
+        System.setProperty("javax.net.ssl.keyStorePassword", "password");
+        System.setProperty("javax.net.ssl.keyStoreType", "jks");
+        //System.setProperty("javax.net.debug","ssl,handshake");
+        super.setUp();
+    }
+
+    @Override
+    protected void addOpenWireConnector() throws Exception {
+        TransportConnector connector = brokerService.addConnector(
+                "ssl://0.0.0.0:0?needClientAuth=true");
+        jmsUri = connector.getPublishableConnectString();
+    }
+
+    @Override
+    protected void addStompConnector() throws Exception {
+        TransportConnector connector = brokerService.addConnector(
+                "stomp+ssl://0.0.0.0:"+port+"?needClientAuth=true");
+        sslPort = connector.getConnectUri().getPort();
+    }
+
+    @Override
+    protected Socket createSocket() throws IOException {
+        SocketFactory factory = SSLSocketFactory.getDefault();
+        return factory.createSocket("127.0.0.1", this.sslPort);
+    }
+}

Propchange: activemq/trunk/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/Stomp11SslAuthTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/trunk/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/Stomp11Test.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/Stomp11Test.java?rev=1408161&view=auto
==============================================================================
--- activemq/trunk/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/Stomp11Test.java (added)
+++ activemq/trunk/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/Stomp11Test.java Mon Nov 12 00:15:50 2012
@@ -0,0 +1,1057 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.stomp;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.DataInputStream;
+import java.net.SocketTimeoutException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.Connection;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.management.ObjectName;
+
+import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.broker.jmx.BrokerViewMBean;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTextMessage;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class Stomp11Test extends StompTestSupport {
+
+    private static final Logger LOG = LoggerFactory.getLogger(Stomp11Test.class);
+
+    private Connection connection;
+    private Session session;
+    private ActiveMQQueue queue;
+
+    @Override
+    public void setUp() throws Exception {
+
+        super.setUp();
+
+        stompConnect();
+
+        connection = cf.createConnection("system", "manager");
+        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        queue = new ActiveMQQueue(getQueueName());
+        connection.start();
+    }
+
+    @Override
+    protected void addStompConnector() throws Exception {
+        TransportConnector connector = brokerService.addConnector("stomp://0.0.0.0:"+port);
+        port = connector.getConnectUri().getPort();
+    }
+
+    @Test
+    public void testConnect() throws Exception {
+
+        String connectFrame = "STOMP\n" +
+                              "login:system\n" +
+                              "passcode:manager\n" +
+                              "accept-version:1.1\n" +
+                              "host:localhost\n" +
+                              "request-id:1\n" +
+                              "\n" + Stomp.NULL;
+        stompConnection.sendFrame(connectFrame);
+
+        String f = stompConnection.receiveFrame();
+        LOG.debug("Broker sent: " + f);
+
+        assertTrue(f.startsWith("CONNECTED"));
+        assertTrue(f.indexOf("response-id:1") >= 0);
+        assertTrue(f.indexOf("version:1.1") >= 0);
+        assertTrue(f.indexOf("session:") >= 0);
+
+        String frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(frame);
+    }
+
+    @Test
+    public void testConnectedNeverEncoded() throws Exception {
+
+        String connectFrame = "STOMP\n" +
+                              "login:system\n" +
+                              "passcode:manager\n" +
+                              "accept-version:1.1\n" +
+                              "host:localhost\n" +
+                              "request-id:1\n" +
+                              "\n" + Stomp.NULL;
+        stompConnection.sendFrame(connectFrame);
+
+        String f = stompConnection.receiveFrame();
+        LOG.debug("Broker sent: " + f);
+
+        assertTrue(f.startsWith("CONNECTED"));
+        assertTrue(f.indexOf("response-id:1") >= 0);
+        assertTrue(f.indexOf("version:1.1") >= 0);
+        assertTrue(f.indexOf("session:") >= 0);
+
+        int sessionHeader = f.indexOf("session:");
+        f = f.substring(sessionHeader + "session:".length());
+
+        LOG.info("session header follows: " + f);
+        assertTrue(f.startsWith("ID:"));
+
+        String frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(frame);
+    }
+
+    @Test
+    public void testConnectWithVersionOptions() throws Exception {
+
+        String connectFrame = "STOMP\n" +
+                              "login:system\n" +
+                              "passcode:manager\n" +
+                              "accept-version:1.0,1.1\n" +
+                              "host:localhost\n" +
+                              "\n" + Stomp.NULL;
+        stompConnection.sendFrame(connectFrame);
+
+        String f = stompConnection.receiveFrame();
+        LOG.debug("Broker sent: " + f);
+
+        assertTrue(f.startsWith("CONNECTED"));
+        assertTrue(f.indexOf("version:1.1") >= 0);
+        assertTrue(f.indexOf("session:") >= 0);
+
+        String frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(frame);
+    }
+
+    @Test
+    public void testConnectWithValidFallback() throws Exception {
+
+        String connectFrame = "STOMP\n" +
+                              "login:system\n" +
+                              "passcode:manager\n" +
+                              "accept-version:1.0,10.1\n" +
+                              "host:localhost\n" +
+                              "\n" + Stomp.NULL;
+        stompConnection.sendFrame(connectFrame);
+
+        String f = stompConnection.receiveFrame();
+        LOG.debug("Broker sent: " + f);
+
+        assertTrue(f.startsWith("CONNECTED"));
+        assertTrue(f.indexOf("version:1.0") >= 0);
+        assertTrue(f.indexOf("session:") >= 0);
+
+        String frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(frame);
+    }
+
+    @Test
+    public void testConnectWithInvalidFallback() throws Exception {
+
+        String connectFrame = "STOMP\n" +
+                              "login:system\n" +
+                              "passcode:manager\n" +
+                              "accept-version:9.0,10.1\n" +
+                              "host:localhost\n" +
+                              "\n" + Stomp.NULL;
+        stompConnection.sendFrame(connectFrame);
+
+        String f = stompConnection.receiveFrame();
+        LOG.debug("Broker sent: " + f);
+
+        assertTrue(f.startsWith("ERROR"));
+        assertTrue(f.indexOf("version") >= 0);
+        assertTrue(f.indexOf("message:") >= 0);
+    }
+
+    @Test
+    public void testHeartbeats() throws Exception {
+
+        String connectFrame = "STOMP\n" +
+                              "login:system\n" +
+                              "passcode:manager\n" +
+                              "accept-version:1.1\n" +
+                              "heart-beat:0,1000\n" +
+                              "host:localhost\n" +
+                              "\n" + Stomp.NULL;
+
+        stompConnection.sendFrame(connectFrame);
+        String f = stompConnection.receiveFrame().trim();
+
+        LOG.info("Broker sent: " + f);
+
+        assertTrue("Failed to receive a connected frame.", f.startsWith("CONNECTED"));
+        assertTrue("Frame should have a versoion 1.1 header.", f.indexOf("version:1.1") >= 0);
+        assertTrue("Frame should have a heart beat header.", f.indexOf("heart-beat:") >= 0);
+        assertTrue("Frame should have a session header.", f.indexOf("session:") >= 0);
+
+        stompConnection.getStompSocket().getOutputStream().write('\n');
+
+        DataInputStream in = new DataInputStream(stompConnection.getStompSocket().getInputStream());
+        in.read();
+        {
+            long startTime = System.currentTimeMillis();
+            int input = in.read();
+            assertEquals("did not receive the correct hear beat value", '\n', input);
+            long endTime = System.currentTimeMillis();
+            assertTrue("Broker did not send KeepAlive in time", (endTime - startTime) >= 900);
+        }
+        {
+            long startTime = System.currentTimeMillis();
+            int input = in.read();
+            assertEquals("did not receive the correct hear beat value", '\n', input);
+            long endTime = System.currentTimeMillis();
+            assertTrue("Broker did not send KeepAlive in time", (endTime - startTime) >= 900);
+        }
+
+        String frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(frame);
+    }
+
+    @Test
+    public void testHeartbeatsDropsIdleConnection() throws Exception {
+
+        String connectFrame = "STOMP\n" +
+                              "login:system\n" +
+                              "passcode:manager\n" +
+                              "accept-version:1.1\n" +
+                              "heart-beat:1000,0\n" +
+                              "host:localhost\n" +
+                              "\n" + Stomp.NULL;
+
+        stompConnection.sendFrame(connectFrame);
+        String f = stompConnection.receiveFrame();
+        assertTrue(f.startsWith("CONNECTED"));
+        assertTrue(f.indexOf("version:1.1") >= 0);
+        assertTrue(f.indexOf("heart-beat:") >= 0);
+        assertTrue(f.indexOf("session:") >= 0);
+        LOG.debug("Broker sent: " + f);
+
+        long startTime = System.currentTimeMillis();
+
+        try {
+            f = stompConnection.receiveFrame();
+            LOG.debug("Broker sent: " + f);
+            fail();
+        } catch(Exception e) {
+        }
+
+        long endTime = System.currentTimeMillis();
+        assertTrue("Broker did close idle connection in time.", (endTime - startTime) >= 1000);
+    }
+
+    @Test
+    public void testHeartbeatsKeepsConnectionOpen() throws Exception {
+
+        String connectFrame = "STOMP\n" +
+                              "login:system\n" +
+                              "passcode:manager\n" +
+                              "accept-version:1.1\n" +
+                              "heart-beat:2000,0\n" +
+                              "host:localhost\n" +
+                              "\n" + Stomp.NULL;
+
+        stompConnection.sendFrame(connectFrame);
+        String f = stompConnection.receiveFrame();
+        assertTrue(f.startsWith("CONNECTED"));
+        assertTrue(f.indexOf("version:1.1") >= 0);
+        assertTrue(f.indexOf("heart-beat:") >= 0);
+        assertTrue(f.indexOf("session:") >= 0);
+        LOG.debug("Broker sent: " + f);
+
+        String message = "SEND\n" + "destination:/queue/" + getQueueName() + "\n\n" + "Hello World" + Stomp.NULL;
+        stompConnection.sendFrame(message);
+
+        ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor();
+
+        service.scheduleAtFixedRate(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    LOG.info("Sending next KeepAlive");
+                    stompConnection.keepAlive();
+                } catch (Exception e) {
+                }
+            }
+        }, 1, 1, TimeUnit.SECONDS);
+
+        TimeUnit.SECONDS.sleep(20);
+
+        String frame = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" +
+                       "id:12345\n" + "ack:auto\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(frame);
+
+        StompFrame stompFrame = stompConnection.receive();
+        assertTrue(stompFrame.getAction().equals("MESSAGE"));
+
+        service.shutdownNow();
+
+        frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(frame);
+    }
+
+    @Test
+    public void testSendAfterMissingHeartbeat() throws Exception {
+
+        String connectFrame = "STOMP\n" + "login:system\n" +
+                              "passcode:manager\n" +
+                              "accept-version:1.1\n" +
+                              "heart-beat:1000,0\n" +
+                              "host:localhost\n" +
+                              "\n" + Stomp.NULL;
+
+        stompConnection.sendFrame(connectFrame);
+        String f = stompConnection.receiveFrame();
+        assertTrue(f.startsWith("CONNECTED"));
+        assertTrue(f.indexOf("version:1.1") >= 0);
+        assertTrue(f.indexOf("heart-beat:") >= 0);
+        assertTrue(f.indexOf("session:") >= 0);
+        LOG.debug("Broker sent: " + f);
+
+        Thread.sleep(TimeUnit.SECONDS.toMillis(10));
+
+        try {
+            String message = "SEND\n" + "destination:/queue/" + getQueueName() + "\n" +
+                             "receipt:1\n\n" + "Hello World" + Stomp.NULL;
+            stompConnection.sendFrame(message);
+            stompConnection.receiveFrame();
+            fail("SEND frame has been accepted after missing heart beat");
+        } catch (Exception ex) {
+            LOG.info(ex.getMessage());
+        }
+    }
+
+    @Test
+    public void testRejectInvalidHeartbeats1() throws Exception {
+
+        String connectFrame = "STOMP\n" +
+                              "login:system\n" +
+                              "passcode:manager\n" +
+                              "accept-version:1.1\n" +
+                              "heart-beat:0\n" +
+                              "host:localhost\n" +
+                              "\n" + Stomp.NULL;
+        stompConnection.sendFrame(connectFrame);
+
+        String f = stompConnection.receiveFrame();
+        LOG.debug("Broker sent: " + f);
+
+        assertTrue(f.startsWith("ERROR"));
+        assertTrue(f.indexOf("heart-beat") >= 0);
+        assertTrue(f.indexOf("message:") >= 0);
+    }
+
+    @Test
+    public void testRejectInvalidHeartbeats2() throws Exception {
+
+        String connectFrame = "STOMP\n" +
+                              "login:system\n" +
+                              "passcode:manager\n" +
+                              "accept-version:1.1\n" +
+                              "heart-beat:T,0\n" +
+                              "host:localhost\n" +
+                              "\n" + Stomp.NULL;
+        stompConnection.sendFrame(connectFrame);
+
+        String f = stompConnection.receiveFrame();
+        LOG.debug("Broker sent: " + f);
+
+        assertTrue(f.startsWith("ERROR"));
+        assertTrue(f.indexOf("heart-beat") >= 0);
+        assertTrue(f.indexOf("message:") >= 0);
+    }
+
+    @Test
+    public void testRejectInvalidHeartbeats3() throws Exception {
+
+        String connectFrame = "STOMP\n" +
+                              "login:system\n" +
+                              "passcode:manager\n" +
+                              "accept-version:1.1\n" +
+                              "heart-beat:100,10,50\n" +
+                              "host:localhost\n" +
+                              "\n" + Stomp.NULL;
+        stompConnection.sendFrame(connectFrame);
+
+        String f = stompConnection.receiveFrame();
+        LOG.debug("Broker sent: " + f);
+
+        assertTrue(f.startsWith("ERROR"));
+        assertTrue(f.indexOf("heart-beat") >= 0);
+        assertTrue(f.indexOf("message:") >= 0);
+    }
+
+    @Test
+    public void testSubscribeAndUnsubscribe() throws Exception {
+
+        String connectFrame = "STOMP\n" +
+                              "login:system\n" +
+                              "passcode:manager\n" +
+                              "accept-version:1.1\n" +
+                              "host:localhost\n" +
+                              "\n" + Stomp.NULL;
+        stompConnection.sendFrame(connectFrame);
+
+        String f = stompConnection.receiveFrame();
+        LOG.debug("Broker sent: " + f);
+
+        assertTrue(f.startsWith("CONNECTED"));
+
+        String message = "SEND\n" + "destination:/queue/" + getQueueName() + "\n\n" + "Hello World" + Stomp.NULL;
+
+        stompConnection.sendFrame(message);
+
+        String frame = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" +
+                       "id:12345\n" + "ack:auto\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(frame);
+
+        StompFrame stompFrame = stompConnection.receive();
+        assertTrue(stompFrame.getAction().equals("MESSAGE"));
+
+        frame = "UNSUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" +
+                "id:12345\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(frame);
+
+        Thread.sleep(4000);
+
+        stompConnection.sendFrame(message);
+
+        try {
+            frame = stompConnection.receiveFrame();
+            LOG.info("Received frame: " + frame);
+            fail("No message should have been received since subscription was removed");
+        } catch (SocketTimeoutException e) {
+        }
+
+        frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(frame);
+    }
+
+    @Test
+    public void testSubscribeWithNoId() throws Exception {
+
+        String connectFrame = "STOMP\n" +
+                              "login:system\n" +
+                              "passcode:manager\n" +
+                              "accept-version:1.1\n" +
+                              "host:localhost\n" +
+                              "\n" + Stomp.NULL;
+        stompConnection.sendFrame(connectFrame);
+
+        String f = stompConnection.receiveFrame();
+        LOG.debug("Broker sent: " + f);
+
+        assertTrue(f.startsWith("CONNECTED"));
+
+        String frame = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" +
+                       "ack:auto\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(frame);
+
+        frame = stompConnection.receiveFrame();
+        assertTrue(frame.startsWith("ERROR"));
+
+        frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(frame);
+    }
+
+    @Test
+    public void testUnsubscribeWithNoId() throws Exception {
+
+        String connectFrame = "STOMP\n" +
+                              "login:system\n" +
+                              "passcode:manager\n" +
+                              "accept-version:1.1\n" +
+                              "host:localhost\n" +
+                              "\n" + Stomp.NULL;
+        stompConnection.sendFrame(connectFrame);
+
+        String f = stompConnection.receiveFrame();
+        LOG.debug("Broker sent: " + f);
+
+        assertTrue(f.startsWith("CONNECTED"));
+
+        String frame = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" +
+                       "id:12345\n" + "ack:auto\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(frame);
+
+        Thread.sleep(2000);
+
+        frame = "UNSUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" + "\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(frame);
+
+        frame = stompConnection.receiveFrame();
+        assertTrue(frame.startsWith("ERROR"));
+
+        frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(frame);
+    }
+
+    @Test
+    public void testAckMessageWithId() throws Exception {
+
+        String connectFrame = "STOMP\n" +
+                              "login:system\n" +
+                              "passcode:manager\n" +
+                              "accept-version:1.1\n" +
+                              "host:localhost\n" +
+                              "\n" + Stomp.NULL;
+        stompConnection.sendFrame(connectFrame);
+
+        String f = stompConnection.receiveFrame();
+        LOG.debug("Broker sent: " + f);
+
+        assertTrue(f.startsWith("CONNECTED"));
+
+        String message = "SEND\n" + "destination:/queue/" + getQueueName() + "\n\n" + "Hello World" + Stomp.NULL;
+
+        stompConnection.sendFrame(message);
+
+        String frame = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" +
+                       "id:12345\n" + "ack:client\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(frame);
+
+        StompFrame received = stompConnection.receive();
+        assertTrue(received.getAction().equals("MESSAGE"));
+
+        frame = "ACK\n" + "subscription:12345\n" + "message-id:" +
+                received.getHeaders().get("message-id") + "\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(frame);
+
+        frame = "UNSUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" +
+                "id:12345\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(frame);
+
+        frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(frame);
+    }
+
+    @Test
+    public void testAckMessageWithNoId() throws Exception {
+
+        String connectFrame = "STOMP\n" +
+                              "login:system\n" +
+                              "passcode:manager\n" +
+                              "accept-version:1.1\n" +
+                              "host:localhost\n" +
+                              "\n" + Stomp.NULL;
+        stompConnection.sendFrame(connectFrame);
+
+        String f = stompConnection.receiveFrame();
+        LOG.debug("Broker sent: " + f);
+
+        assertTrue(f.startsWith("CONNECTED"));
+
+        String message = "SEND\n" + "destination:/queue/" + getQueueName() + "\n\n" + "Hello World" + Stomp.NULL;
+
+        stompConnection.sendFrame(message);
+
+        String subscribe = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" +
+                           "id:12345\n" + "ack:client\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(subscribe);
+
+        StompFrame received = stompConnection.receive();
+        assertTrue(received.getAction().equals("MESSAGE"));
+
+        String ack = "ACK\n" + "message-id:" +
+                     received.getHeaders().get("message-id") + "\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(ack);
+
+        StompFrame error = stompConnection.receive();
+        assertTrue(error.getAction().equals("ERROR"));
+
+        String unsub = "UNSUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" +
+                       "id:12345\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(unsub);
+
+        String frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(frame);
+    }
+
+    @Test
+    public void testQueueBrowerSubscription() throws Exception {
+
+        final int MSG_COUNT = 10;
+
+        String connectFrame = "STOMP\n" +
+                              "login:system\n" +
+                              "passcode:manager\n" +
+                              "accept-version:1.1\n" +
+                              "host:localhost\n" +
+                              "\n" + Stomp.NULL;
+
+        stompConnection.sendFrame(connectFrame);
+
+        String f = stompConnection.receiveFrame();
+        LOG.debug("Broker sent: " + f);
+
+        assertTrue(f.startsWith("CONNECTED"));
+
+        for(int i = 0; i < MSG_COUNT; ++i) {
+            String message = "SEND\n" + "destination:/queue/" + getQueueName() + "\n" +
+                             "receipt:0\n" +
+                             "\n" + "Hello World {" + i + "}" + Stomp.NULL;
+            stompConnection.sendFrame(message);
+            StompFrame repsonse = stompConnection.receive();
+            assertEquals("0", repsonse.getHeaders().get(Stomp.Headers.Response.RECEIPT_ID));
+        }
+
+        String subscribe = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" +
+                           "id:12345\n" + "browser:true\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(subscribe);
+
+        for(int i = 0; i < MSG_COUNT; ++i) {
+            StompFrame message = stompConnection.receive();
+            assertEquals(Stomp.Responses.MESSAGE, message.getAction());
+            assertEquals("12345", message.getHeaders().get(Stomp.Headers.Message.SUBSCRIPTION));
+        }
+
+        // We should now get a browse done message
+        StompFrame browseDone = stompConnection.receive();
+        LOG.debug("Browse Done: " + browseDone.toString());
+        assertEquals(Stomp.Responses.MESSAGE, browseDone.getAction());
+        assertEquals("12345", browseDone.getHeaders().get(Stomp.Headers.Message.SUBSCRIPTION));
+        assertEquals("end", browseDone.getHeaders().get(Stomp.Headers.Message.BROWSER));
+        assertTrue(browseDone.getHeaders().get(Stomp.Headers.Message.DESTINATION) != null);
+
+        String unsub = "UNSUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" +
+                       "id:12345\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(unsub);
+
+        Thread.sleep(2000);
+
+        subscribe = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" + "id:12345\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(subscribe);
+
+        for(int i = 0; i < MSG_COUNT; ++i) {
+            StompFrame message = stompConnection.receive();
+            assertEquals(Stomp.Responses.MESSAGE, message.getAction());
+            assertEquals("12345", message.getHeaders().get(Stomp.Headers.Message.SUBSCRIPTION));
+        }
+
+        stompConnection.sendFrame(unsub);
+
+        String frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(frame);
+    }
+
+    @Test
+    public void testSendMessageWithStandardHeadersEncoded() throws Exception {
+
+        MessageConsumer consumer = session.createConsumer(queue);
+
+        String frame = "CONNECT\n" + "login:system\n" + "passcode:manager\n" +
+                "accept-version:1.1" + "\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(frame);
+
+        frame = stompConnection.receiveFrame();
+        assertTrue(frame.startsWith("CONNECTED"));
+
+        frame = "SEND\n" + "correlation-id:c1\\:\\n\\23\n" + "priority:3\n" + "type:t34:5\n" + "JMSXGroupID:abc\n" + "foo:a\\bc\n" + "bar:123\n" + "destination:/queue/" + getQueueName() + "\n\n" + "Hello World"
+                + Stomp.NULL;
+
+        stompConnection.sendFrame(frame);
+
+        TextMessage message = (TextMessage)consumer.receive(2500);
+        assertNotNull(message);
+        assertEquals("Hello World", message.getText());
+        assertEquals("JMSCorrelationID", "c1\\:\n\\23", message.getJMSCorrelationID());
+        assertEquals("getJMSType", "t34:5", message.getJMSType());
+        assertEquals("getJMSPriority", 3, message.getJMSPriority());
+        assertEquals("foo", "a\\bc", message.getStringProperty("foo"));
+        assertEquals("bar", "123", message.getStringProperty("bar"));
+
+        assertEquals("JMSXGroupID", "abc", message.getStringProperty("JMSXGroupID"));
+        ActiveMQTextMessage amqMessage = (ActiveMQTextMessage)message;
+        assertEquals("GroupID", "abc", amqMessage.getGroupID());
+
+        frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(frame);
+    }
+
+    @Test
+    public void testSendMessageWithRepeatedEntries() throws Exception {
+
+        MessageConsumer consumer = session.createConsumer(queue);
+
+        String frame = "CONNECT\n" + "login:system\n" + "passcode:manager\n" +
+                "accept-version:1.1" + "\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(frame);
+
+        frame = stompConnection.receiveFrame();
+        assertTrue(frame.startsWith("CONNECTED"));
+
+        frame = "SEND\n" +
+                "value:newest" + "\n" +
+                "value:older" + "\n" +
+                "value:oldest" + "\n" +
+                "destination:/queue/" + getQueueName() +
+                "\n\n" + "Hello World" + Stomp.NULL;
+
+        stompConnection.sendFrame(frame);
+
+        TextMessage message = (TextMessage)consumer.receive(2500);
+        assertNotNull(message);
+        assertEquals("Hello World", message.getText());
+        assertEquals("newest", message.getStringProperty("value"));
+
+        frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(frame);
+    }
+
+    @Test
+    public void testSubscribeWithMessageSentWithEncodedProperties() throws Exception {
+
+        String frame = "CONNECT\n" + "login:system\n" + "passcode:manager\n" +  "accept-version:1.1" + "\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(frame);
+
+        frame = stompConnection.receiveFrame();
+        assertTrue(frame.startsWith("CONNECTED"));
+
+        frame = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" + "id:12345\n" + "ack:auto\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(frame);
+
+        MessageProducer producer = session.createProducer(queue);
+        TextMessage message = session.createTextMessage("Hello World");
+        message.setStringProperty("s", "\\value:");
+        producer.send(message);
+
+        frame = stompConnection.receiveFrame();
+        assertTrue("" + frame, frame.startsWith("MESSAGE"));
+
+        int start =  frame.indexOf("\ns:") + 3;
+        final String expectedEncoded = "\\\\value\\c";
+        final String headerVal = frame.substring(start, start + expectedEncoded.length());
+        assertEquals("" + frame, expectedEncoded, headerVal);
+
+        frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(frame);
+    }
+
+    @Test
+    public void testNackMessage() throws Exception {
+
+        String connectFrame = "STOMP\n" +
+                "login:system\n" +
+                "passcode:manager\n" +
+                "accept-version:1.1\n" +
+                "host:localhost\n" +
+                "\n" + Stomp.NULL;
+        stompConnection.sendFrame(connectFrame);
+
+        String f = stompConnection.receiveFrame();
+        LOG.debug("Broker sent: " + f);
+
+        assertTrue(f.startsWith("CONNECTED"));
+
+        String message = "SEND\n" + "destination:/queue/" + getQueueName() + "\npersistent:true\n\n" + "Hello World" + Stomp.NULL;
+
+        stompConnection.sendFrame(message);
+
+        String frame = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" +
+                "id:12345\n" + "ack:client\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(frame);
+
+        StompFrame received = stompConnection.receive();
+        assertTrue(received.getAction().equals("MESSAGE"));
+
+        // nack it
+        frame = "NACK\n" + "subscription:12345\n" + "message-id:" +
+                received.getHeaders().get("message-id") + "\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(frame);
+
+        frame = "UNSUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" +
+                "id:12345\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(frame);
+
+        //consume it from dlq
+
+        frame = "SUBSCRIBE\n" + "destination:/queue/ActiveMQ.DLQ\n" +
+                "id:12345\n" + "ack:client\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(frame);
+        StompFrame receivedDLQ = stompConnection.receive(200);
+        assertEquals(receivedDLQ.getHeaders().get("message-id"), received.getHeaders().get("message-id"));
+
+        frame = "ACK\n" + "subscription:12345\n" + "message-id:" +
+                received.getHeaders().get("message-id") + "\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(frame);
+
+        frame = "UNSUBSCRIBE\n" + "destination:/queue/ActiveMQ.DLQ\n" +
+                "id:12345\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(frame);
+
+        frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(frame);
+    }
+
+    @Test
+    public void testHeaderValuesAreNotWSTrimmed() throws Exception {
+        stompConnection.setVersion(Stomp.V1_1);
+        String connectFrame = "STOMP\n" +
+                              "login:system\n" +
+                              "passcode:manager\n" +
+                              "accept-version:1.1\n" +
+                              "host:localhost\n" +
+                              "\n" + Stomp.NULL;
+        stompConnection.sendFrame(connectFrame);
+
+        String f = stompConnection.receiveFrame();
+        LOG.debug("Broker sent: " + f);
+
+        assertTrue(f.startsWith("CONNECTED"));
+
+        String message = "SEND\n" + "destination:/queue/" + getQueueName() +
+                         "\ntest1: value" +
+                         "\ntest2:value " +
+                         "\ntest3: value " +
+                         "\n\n" + "Hello World" + Stomp.NULL;
+
+        stompConnection.sendFrame(message);
+
+        String frame = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" +
+                       "id:12345\n" + "ack:auto\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(frame);
+
+        StompFrame received = stompConnection.receive();
+        assertTrue(received.getAction().equals("MESSAGE"));
+
+        assertEquals(" value", received.getHeaders().get("test1"));
+        assertEquals("value ", received.getHeaders().get("test2"));
+        assertEquals(" value ", received.getHeaders().get("test3"));
+
+        frame = "UNSUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" +
+                "id:12345\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(frame);
+
+        frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(frame);
+    }
+
+    @Test
+    public void testDurableSubAndUnSubOnTwoTopics() throws Exception {
+        stompConnection.setVersion(Stomp.V1_1);
+
+        String domain = "org.apache.activemq";
+        ObjectName brokerName = new ObjectName(domain + ":Type=Broker,BrokerName=localhost");
+
+        BrokerViewMBean view = (BrokerViewMBean)brokerService.getManagementContext().newProxyInstance(brokerName, BrokerViewMBean.class, true);
+
+        String connectFrame = "STOMP\n" +
+                "login:system\n" + "passcode:manager\n" + "accept-version:1.1\n" +
+                "host:localhost\n" + "client-id:test\n" + "\n" + Stomp.NULL;
+        stompConnection.sendFrame(connectFrame);
+
+        String frame = stompConnection.receiveFrame();
+        LOG.debug("Broker sent: " + frame);
+
+        assertTrue(frame.startsWith("CONNECTED"));
+        assertEquals(view.getDurableTopicSubscribers().length, 0);
+
+        // subscribe to first destination durably
+        frame = "SUBSCRIBE\n" +
+                "destination:/topic/" + getQueueName() + "1" + "\n" +
+                "ack:auto\n" + "receipt:1\n" + "id:durablesub-1\n" +
+                "activemq.subscriptionName:test1\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(frame);
+
+        StompFrame receipt = stompConnection.receive();
+        LOG.debug("Broker sent: " + receipt);
+        assertTrue(receipt.getAction().startsWith("RECEIPT"));
+        assertEquals("1", receipt.getHeaders().get("receipt-id"));
+        assertEquals(view.getDurableTopicSubscribers().length, 1);
+
+        // subscribe to second destination durably
+        frame = "SUBSCRIBE\n" +
+                "destination:/topic/" + getQueueName() + "2" + "\n" +
+                "ack:auto\n" + "receipt:2\n" + "id:durablesub-2\n" +
+                "activemq.subscriptionName:test2\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(frame);
+
+        receipt = stompConnection.receive();
+        LOG.debug("Broker sent: " + receipt);
+        assertTrue(receipt.getAction().startsWith("RECEIPT"));
+        assertEquals("2", receipt.getHeaders().get("receipt-id"));
+        assertEquals(view.getDurableTopicSubscribers().length, 2);
+
+        frame = "DISCONNECT\nclient-id:test\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(frame);
+        try {
+            Thread.sleep(400);
+        } catch (InterruptedException e){}
+
+        // reconnect and send some messages to the offline subscribers and then try to get
+        // them after subscribing again.
+        stompConnect();
+        stompConnection.sendFrame(connectFrame);
+        frame = stompConnection.receiveFrame();
+        LOG.debug("Broker sent: " + frame);
+        assertTrue(frame.startsWith("CONNECTED"));
+        assertEquals(view.getDurableTopicSubscribers().length, 0);
+        assertEquals(view.getInactiveDurableTopicSubscribers().length, 2);
+
+        // unsubscribe from topic 1
+        frame = "UNSUBSCRIBE\n" + "destination:/topic/" + getQueueName() + "1\n" +
+                "id:durablesub-1\n" + "receipt:3\n" +
+                "activemq.subscriptionName:test1\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(frame);
+        receipt = stompConnection.receive();
+        LOG.debug("Broker sent: " + frame);
+        assertTrue(receipt.getAction().startsWith("RECEIPT"));
+        assertEquals("3", receipt.getHeaders().get("receipt-id"));
+
+        assertEquals(view.getInactiveDurableTopicSubscribers().length, 1);
+
+        // unsubscribe from topic 2
+        frame = "UNSUBSCRIBE\n" + "destination:/topic/" + getQueueName() + "2\n" +
+                "id:durablesub-2\n" + "receipt:4\n" +
+                "activemq.subscriptionName:test2\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(frame);
+        receipt = stompConnection.receive();
+        LOG.debug("Broker sent: " + frame);
+        assertTrue(receipt.getAction().startsWith("RECEIPT"));
+        assertEquals("4", receipt.getHeaders().get("receipt-id"));
+
+        assertEquals(view.getInactiveDurableTopicSubscribers().length, 0);
+
+        frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(frame);
+    }
+
+    @Test
+    public void testMultipleDurableSubsWithOfflineMessages() throws Exception {
+        stompConnection.setVersion(Stomp.V1_1);
+
+        String domain = "org.apache.activemq";
+        ObjectName brokerName = new ObjectName(domain + ":Type=Broker,BrokerName=localhost");
+
+        BrokerViewMBean view = (BrokerViewMBean)brokerService.getManagementContext().newProxyInstance(brokerName, BrokerViewMBean.class, true);
+
+        String connectFrame = "STOMP\n" + "login:system\n" + "passcode:manager\n" +
+                "accept-version:1.1\n" + "host:localhost\n" + "client-id:test\n" + "\n" + Stomp.NULL;
+        stompConnection.sendFrame(connectFrame);
+
+        String frame = stompConnection.receiveFrame();
+        LOG.debug("Broker sent: " + frame);
+
+        assertTrue(frame.startsWith("CONNECTED"));
+        assertEquals(view.getDurableTopicSubscribers().length, 0);
+
+        // subscribe to first destination durably
+        frame = "SUBSCRIBE\n" +
+                "destination:/topic/" + getQueueName() + "1" + "\n" +
+                "ack:auto\n" + "receipt:1\n" + "id:durablesub-1\n" +
+                "activemq.subscriptionName:test1\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(frame);
+
+        StompFrame receipt = stompConnection.receive();
+        LOG.debug("Broker sent: " + receipt);
+        assertTrue(receipt.getAction().startsWith("RECEIPT"));
+        assertEquals("1", receipt.getHeaders().get("receipt-id"));
+        assertEquals(view.getDurableTopicSubscribers().length, 1);
+
+        // subscribe to second destination durably
+        frame = "SUBSCRIBE\n" +
+                "destination:/topic/" + getQueueName() + "2" + "\n" +
+                "ack:auto\n" + "receipt:2\n" + "id:durablesub-2\n" +
+                "activemq.subscriptionName:test2\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(frame);
+
+        receipt = stompConnection.receive();
+        LOG.debug("Broker sent: " + receipt);
+        assertTrue(receipt.getAction().startsWith("RECEIPT"));
+        assertEquals("2", receipt.getHeaders().get("receipt-id"));
+        assertEquals(view.getDurableTopicSubscribers().length, 2);
+
+        frame = "DISCONNECT\nclient-id:test\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(frame);
+        try {
+            Thread.sleep(400);
+        } catch (InterruptedException e){}
+
+        // reconnect and send some messages to the offline subscribers and then try to get
+        // them after subscribing again.
+        stompConnect();
+        stompConnection.sendFrame(connectFrame);
+        frame = stompConnection.receiveFrame();
+        LOG.debug("Broker sent: " + frame);
+        assertTrue(frame.startsWith("CONNECTED"));
+        assertEquals(view.getDurableTopicSubscribers().length, 0);
+        assertEquals(view.getInactiveDurableTopicSubscribers().length, 2);
+
+        frame = "SEND\n" + "destination:/topic/" + getQueueName() + "1\n" +
+                "receipt:10\n" + "\n" + "Hello World 1" + Stomp.NULL;
+        stompConnection.sendFrame(frame);
+        receipt = stompConnection.receive();
+        assertEquals("10", receipt.getHeaders().get(Stomp.Headers.Response.RECEIPT_ID));
+
+        frame = "SEND\n" + "destination:/topic/" + getQueueName() + "2\n" +
+                "receipt:11\n" + "\n" + "Hello World 2" + Stomp.NULL;
+        stompConnection.sendFrame(frame);
+        receipt = stompConnection.receive();
+        assertEquals("11", receipt.getHeaders().get(Stomp.Headers.Response.RECEIPT_ID));
+
+        // subscribe to first destination durably
+        frame = "SUBSCRIBE\n" +
+                "destination:/topic/" + getQueueName() + "1" + "\n" +
+                "ack:auto\n" + "receipt:3\n" + "id:durablesub-1\n" +
+                "activemq.subscriptionName:test1\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(frame);
+
+        receipt = stompConnection.receive();
+        LOG.debug("Broker sent: " + receipt);
+        assertTrue(receipt.getAction().startsWith("RECEIPT"));
+        assertEquals("3", receipt.getHeaders().get("receipt-id"));
+        assertEquals(view.getDurableTopicSubscribers().length, 1);
+
+        StompFrame message = stompConnection.receive();
+        assertEquals(Stomp.Responses.MESSAGE, message.getAction());
+        assertEquals("durablesub-1", message.getHeaders().get(Stomp.Headers.Message.SUBSCRIPTION));
+
+        assertEquals(view.getDurableTopicSubscribers().length, 1);
+        assertEquals(view.getInactiveDurableTopicSubscribers().length, 1);
+
+        // subscribe to second destination durably
+        frame = "SUBSCRIBE\n" +
+                "destination:/topic/" + getQueueName() + "2" + "\n" +
+                "ack:auto\n" + "receipt:4\n" + "id:durablesub-2\n" +
+                "activemq.subscriptionName:test2\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(frame);
+
+        receipt = stompConnection.receive();
+        LOG.debug("Broker sent: " + receipt);
+        assertTrue(receipt.getAction().startsWith("RECEIPT"));
+        assertEquals("4", receipt.getHeaders().get("receipt-id"));
+        assertEquals(view.getDurableTopicSubscribers().length, 2);
+
+        message = stompConnection.receive();
+        assertEquals(Stomp.Responses.MESSAGE, message.getAction());
+        assertEquals("durablesub-2", message.getHeaders().get(Stomp.Headers.Message.SUBSCRIPTION));
+
+        assertEquals(view.getDurableTopicSubscribers().length, 2);
+        assertEquals(view.getInactiveDurableTopicSubscribers().length, 0);
+
+        frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(frame);
+    }
+}

Propchange: activemq/trunk/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/Stomp11Test.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/trunk/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/Stomp12NIOSSLTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/Stomp12NIOSSLTest.java?rev=1408161&view=auto
==============================================================================
--- activemq/trunk/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/Stomp12NIOSSLTest.java (added)
+++ activemq/trunk/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/Stomp12NIOSSLTest.java Mon Nov 12 00:15:50 2012
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.stomp;
+
+import java.io.IOException;
+import java.net.Socket;
+
+import javax.net.SocketFactory;
+import javax.net.ssl.SSLSocketFactory;
+
+import org.apache.activemq.broker.TransportConnector;
+
+public class Stomp12NIOSSLTest extends Stomp12Test {
+
+    @Override
+    public void setUp() throws Exception {
+        System.setProperty("javax.net.ssl.trustStore", "src/test/resources/client.keystore");
+        System.setProperty("javax.net.ssl.trustStorePassword", "password");
+        System.setProperty("javax.net.ssl.trustStoreType", "jks");
+        System.setProperty("javax.net.ssl.keyStore", "src/test/resources/server.keystore");
+        System.setProperty("javax.net.ssl.keyStorePassword", "password");
+        System.setProperty("javax.net.ssl.keyStoreType", "jks");
+        super.setUp();
+    }
+
+    @Override
+	protected void addStompConnector() throws Exception {
+        TransportConnector connector = brokerService.addConnector("stomp+nio+ssl://0.0.0.0:"+nioSslPort);
+        nioSslPort = connector.getConnectUri().getPort();
+    }
+
+    @Override
+    protected Socket createSocket() throws IOException {
+        SocketFactory factory = SSLSocketFactory.getDefault();
+        return factory.createSocket("127.0.0.1", this.nioSslPort);
+    }
+}

Propchange: activemq/trunk/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/Stomp12NIOSSLTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/trunk/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/Stomp12NIOTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/Stomp12NIOTest.java?rev=1408161&view=auto
==============================================================================
--- activemq/trunk/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/Stomp12NIOTest.java (added)
+++ activemq/trunk/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/Stomp12NIOTest.java Mon Nov 12 00:15:50 2012
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.stomp;
+
+import java.io.IOException;
+import java.net.Socket;
+
+import org.apache.activemq.broker.TransportConnector;
+
+public class Stomp12NIOTest extends Stomp12Test {
+
+    @Override
+	protected void addStompConnector() throws Exception {
+        TransportConnector connector = brokerService.addConnector("stomp+nio://0.0.0.0:"+nioPort);
+        nioPort = connector.getConnectUri().getPort();
+    }
+
+    @Override
+    protected Socket createSocket() throws IOException {
+        return new Socket("127.0.0.1", this.nioPort);
+    }
+}

Propchange: activemq/trunk/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/Stomp12NIOTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/trunk/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/Stomp12SslAuthTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/Stomp12SslAuthTest.java?rev=1408161&view=auto
==============================================================================
--- activemq/trunk/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/Stomp12SslAuthTest.java (added)
+++ activemq/trunk/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/Stomp12SslAuthTest.java Mon Nov 12 00:15:50 2012
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.stomp;
+
+import java.io.IOException;
+import java.net.Socket;
+
+import javax.net.SocketFactory;
+import javax.net.ssl.SSLSocketFactory;
+
+import org.apache.activemq.broker.TransportConnector;
+
+public class Stomp12SslAuthTest extends Stomp12Test {
+
+    @Override
+    public void setUp() throws Exception {
+        System.setProperty("javax.net.ssl.trustStore", "src/test/resources/client.keystore");
+        System.setProperty("javax.net.ssl.trustStorePassword", "password");
+        System.setProperty("javax.net.ssl.trustStoreType", "jks");
+        System.setProperty("javax.net.ssl.keyStore", "src/test/resources/server.keystore");
+        System.setProperty("javax.net.ssl.keyStorePassword", "password");
+        System.setProperty("javax.net.ssl.keyStoreType", "jks");
+        //System.setProperty("javax.net.debug","ssl,handshake");
+        super.setUp();
+    }
+
+    @Override
+    protected void addOpenWireConnector() throws Exception {
+        TransportConnector connector = brokerService.addConnector(
+                "ssl://0.0.0.0:0?needClientAuth=true");
+        jmsUri = connector.getPublishableConnectString();
+    }
+
+    @Override
+    protected void addStompConnector() throws Exception {
+        TransportConnector connector = brokerService.addConnector(
+                "stomp+ssl://0.0.0.0:"+port+"?needClientAuth=true");
+        sslPort = connector.getConnectUri().getPort();
+    }
+
+    @Override
+    protected Socket createSocket() throws IOException {
+        SocketFactory factory = SSLSocketFactory.getDefault();
+        return factory.createSocket("127.0.0.1", this.sslPort);
+    }
+}

Propchange: activemq/trunk/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/Stomp12SslAuthTest.java
------------------------------------------------------------------------------
    svn:eol-style = native