You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2012/07/27 18:04:02 UTC

svn commit: r1366431 [1/2] - in /activemq/activemq-apollo/trunk: ./ apollo-amqp/ apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/ apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/ apollo-broker/src/test/scala/ apol...

Author: chirino
Date: Fri Jul 27 16:04:01 2012
New Revision: 1366431

URL: http://svn.apache.org/viewvc?rev=1366431&view=rev
Log:
Changes needed to get most of the stomp tests running in parallel.

Added:
    activemq/activemq-apollo/trunk/apollo-util/src/test/scala/org/scalatest/
    activemq/activemq-apollo/trunk/apollo-util/src/test/scala/org/scalatest/ParallelBeforeAndAfterAll.scala
    activemq/activemq-apollo/trunk/apollo-util/src/test/scala/org/scalatest/junit/
    activemq/activemq-apollo/trunk/apollo-util/src/test/scala/org/scalatest/junit/ParallelJUnitRunner.scala
Modified:
    activemq/activemq-apollo/trunk/apollo-amqp/pom.xml
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/RawProtocol.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/BrokerFunSuiteSupport.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/store/StoreFunSuiteSupport.scala
    activemq/activemq-apollo/trunk/apollo-itests/pom.xml
    activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/web/NetworkWebModule.scala
    activemq/activemq-apollo/trunk/apollo-openwire/pom.xml
    activemq/activemq-apollo/trunk/apollo-stomp/src/test/resources/apollo-stomp-bdb.xml
    activemq/activemq-apollo/trunk/apollo-stomp/src/test/resources/apollo-stomp-leveldb.xml
    activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala
    activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util.scala
    activemq/activemq-apollo/trunk/apollo-util/src/test/scala/org/apache/activemq/apollo/util/FunSuiteSupport.scala
    activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/BrokerResource.scala
    activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/ConfigurationResource.scala
    activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/SessionResource.scala
    activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/Support.scala
    activemq/activemq-apollo/trunk/pom.xml

Modified: activemq/activemq-apollo/trunk/apollo-amqp/pom.xml
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-amqp/pom.xml?rev=1366431&r1=1366430&r2=1366431&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-amqp/pom.xml (original)
+++ activemq/activemq-apollo/trunk/apollo-amqp/pom.xml Fri Jul 27 16:04:01 2012
@@ -191,6 +191,17 @@
         </executions>
       </plugin>
       
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-surefire-plugin</artifactId>
+        <version>${maven-surefire-plugin-version}</version>
+        <configuration>
+          <parallel>classes</parallel>
+          <perCoreThreadCount>false</perCoreThreadCount>
+          <threadCount>1</threadCount> 
+        </configuration>
+      </plugin>
+      
     </plugins>
   </build>
   <profiles>

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala?rev=1366431&r1=1366430&r2=1366431&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala Fri Jul 27 16:04:01 2012
@@ -309,6 +309,8 @@ class VirtualHost(val broker: Broker, va
     })
   }
   
+  import FutureResult._
+
   def get_queue_metrics:FutureResult[AggregateDestMetricsDTO] = {
     val queues:Iterable[Queue] = local_router.local_queue_domain.destinations
     val metrics = sync_all (queues) { queue =>

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/RawProtocol.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/RawProtocol.scala?rev=1366431&r1=1366430&r2=1366431&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/RawProtocol.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/protocol/RawProtocol.scala Fri Jul 27 16:04:01 2012
@@ -83,7 +83,7 @@ case class RawMessage(payload:Buffer) ex
     } else if( toType == classOf[ByteBuffer] ) {
       toType.cast(payload.toByteBuffer)
     } else {
-      null
+      null.asInstanceOf[T]
     }
   }
 

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/BrokerFunSuiteSupport.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/BrokerFunSuiteSupport.scala?rev=1366431&r1=1366430&r2=1366431&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/BrokerFunSuiteSupport.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/BrokerFunSuiteSupport.scala Fri Jul 27 16:04:01 2012
@@ -22,8 +22,12 @@ import org.apache.activemq.apollo.util._
 import FileSupport._
 import org.fusesource.hawtdispatch._
 import org.apache.activemq.apollo.dto.{AggregateDestMetricsDTO, QueueStatusDTO, TopicStatusDTO}
+import collection.immutable.HashMap
+import java.io.File
+import org.scalatest.{ParallelTestExecution, OneInstancePerTest}
 
 object BrokerTestSupport {
+  import FutureResult._
 
   def connector_port(broker:Broker, connector: String): Option[Int] = Option(connector).map {
     id => broker.connectors.get(id).map(_.socket_address.asInstanceOf[InetSocketAddress].getPort).getOrElse(0)
@@ -106,7 +110,21 @@ object BrokerTestSupport {
   def webadmin_uri(broker:Broker, scheme:String) = {
     Option(broker.web_server).flatMap(_.uris().find(_.getScheme == scheme)).get
   }
+
 }
+
+trait BrokerParallelTestExecution extends ParallelTestExecution {
+  self: BrokerFunSuiteSupport =>
+
+  override def newInstance = {
+    val rc = super.newInstance.asInstanceOf[BrokerFunSuiteSupport]
+    rc.broker = broker
+    rc.port = port
+    rc
+  }
+
+}
+
 /**
  * <p>
  * </p>
@@ -118,19 +136,14 @@ class BrokerFunSuiteSupport extends FunS
   var port = 0
 
   def broker_config_uri = "xml:classpath:apollo.xml"
-
-  def createBroker: Broker = {
-    info("Loading broker configuration from the classpath with URI: " + broker_config_uri)
-    var broker = BrokerFactory.createBroker(broker_config_uri)
-    broker.setTmp(basedir / "target" / "tmp")
-    broker.getTmp().mkdirs()
-    broker
-  }
+  def createBroker = BrokerFactory.createBroker(broker_config_uri)
 
   override def beforeAll() = {
     super.beforeAll()
     try {
       broker = createBroker
+      broker.setTmp(test_data_dir / "tmp")
+      broker.getTmp().mkdirs()
       ServiceControl.start(broker)
       port = broker.get_socket_address.asInstanceOf[InetSocketAddress].getPort
     } catch {

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/store/StoreFunSuiteSupport.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/store/StoreFunSuiteSupport.scala?rev=1366431&r1=1366430&r2=1366431&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/store/StoreFunSuiteSupport.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/test/scala/org/apache/activemq/apollo/broker/store/StoreFunSuiteSupport.scala Fri Jul 27 16:04:01 2012
@@ -45,7 +45,7 @@ abstract class StoreFunSuiteSupport exte
    */
 
 
-  def data_directory = basedir / "target" / "apollo-data"
+  def data_directory = test_data_dir / "store"
 
   override protected def beforeAll() = {
     super.beforeAll()

Modified: activemq/activemq-apollo/trunk/apollo-itests/pom.xml
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-itests/pom.xml?rev=1366431&r1=1366430&r2=1366431&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-itests/pom.xml (original)
+++ activemq/activemq-apollo/trunk/apollo-itests/pom.xml Fri Jul 27 16:04:01 2012
@@ -161,21 +161,15 @@
         <groupId>org.apache.maven.plugins</groupId>
         <artifactId>maven-surefire-plugin</artifactId>
         <version>${maven-surefire-plugin-version}</version>
-
         <configuration>
-          <!-- we must turn off the use of system class loader so our tests can find stuff - otherwise ScalaSupport compiler can't find stuff -->
-          <useSystemClassLoader>false</useSystemClassLoader>
-          <forkMode>pertest</forkMode>
-          <childDelegation>false</childDelegation>
-          <useFile>true</useFile>
-          <redirectTestOutputToFile>true</redirectTestOutputToFile>
-          <failIfNoTests>false</failIfNoTests>
-          
           <excludes>
             <!--
             <exclude>**/JmsTopicTransactionTest.*</exclude>
             -->
           </excludes>
+          <parallel>classes</parallel>
+          <perCoreThreadCount>false</perCoreThreadCount>
+          <threadCount>1</threadCount> 
         </configuration>
       </plugin>
 

Modified: activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/web/NetworkWebModule.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/web/NetworkWebModule.scala?rev=1366431&r1=1366430&r2=1366431&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/web/NetworkWebModule.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/web/NetworkWebModule.scala Fri Jul 27 16:04:01 2012
@@ -10,7 +10,8 @@ import org.apache.activemq.apollo.dto.{D
 import org.apache.activemq.apollo.broker.LocalRouter
 import org.fusesource.hawtdispatch.Future
 import scala.Predef._
-import org.apache.activemq.apollo.util.Success
+import org.apache.activemq.apollo.util.{FutureResult, Success}
+import FutureResult._
 
 /**
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>

Modified: activemq/activemq-apollo/trunk/apollo-openwire/pom.xml
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-openwire/pom.xml?rev=1366431&r1=1366430&r2=1366431&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-openwire/pom.xml (original)
+++ activemq/activemq-apollo/trunk/apollo-openwire/pom.xml Fri Jul 27 16:04:01 2012
@@ -195,7 +195,16 @@
           </execution>
         </executions>
       </plugin>
-
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-surefire-plugin</artifactId>
+        <version>${maven-surefire-plugin-version}</version>
+        <configuration>
+          <parallel>classes</parallel>
+          <perCoreThreadCount>false</perCoreThreadCount>
+          <threadCount>1</threadCount> 
+        </configuration>
+      </plugin>
     </plugins>
   </build>
   

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/test/resources/apollo-stomp-bdb.xml
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/test/resources/apollo-stomp-bdb.xml?rev=1366431&r1=1366430&r2=1366431&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/test/resources/apollo-stomp-bdb.xml (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/test/resources/apollo-stomp-bdb.xml Fri Jul 27 16:04:01 2012
@@ -21,13 +21,27 @@
   <virtual_host id="default">
     <host_name>localhost</host_name>
 
+    <queue id="quota.**" quota="10k"/>
+    <topic id="quota.**" slow_consumer_policy="queue">
+      <subscription quota="10k"/>
+    </topic>
+
     <queue id="nacker.**" dlq="dlq.*" nak_limit="2"/>
     <queue id="mirrored.**" mirrored="true"/>
+    <topic id="queued.**" slow_consumer_policy="queue">
+      <subscription tail_buffer="4k"/>
+    </topic>
+
+    <queue id="drop.head.persistent" full_policy="drop head" quota="100k"/>
+    <queue id="drop.tail.persistent" full_policy="drop tail" quota="100k"/>
+    <queue id="drop.head.non" full_policy="drop head" tail_buffer="100k" persistent="false"/>
+    <queue id="drop.tail.non" full_policy="drop tail" tail_buffer="100k" persistent="false"/>
 
     <bdb_store directory="${testdatadir}"/>
   </virtual_host>
 
   <!--<web_admin bind="http://0.0.0.0:61680"/>-->
   <connector id="tcp" bind="tcp://0.0.0.0:0"/>
+  <connector id="udp" bind="udp://0.0.0.0:0" protocol="udp"/>
 
 </broker>
\ No newline at end of file

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/test/resources/apollo-stomp-leveldb.xml
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/test/resources/apollo-stomp-leveldb.xml?rev=1366431&r1=1366430&r2=1366431&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/test/resources/apollo-stomp-leveldb.xml (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/test/resources/apollo-stomp-leveldb.xml Fri Jul 27 16:04:01 2012
@@ -21,18 +21,27 @@
   <virtual_host id="default">
     <host_name>localhost</host_name>
 
+    <queue id="quota.**" quota="10k"/>
+    <topic id="quota.**" slow_consumer_policy="queue">
+      <subscription quota="10k"/>
+    </topic>
+
+    <queue id="nacker.**" dlq="dlq.*" nak_limit="2"/>
+    <queue id="mirrored.**" mirrored="true"/>
+    <topic id="queued.**" slow_consumer_policy="queue">
+      <subscription tail_buffer="4k"/>
+    </topic>
+
     <queue id="drop.head.persistent" full_policy="drop head" quota="100k"/>
     <queue id="drop.tail.persistent" full_policy="drop tail" quota="100k"/>
     <queue id="drop.head.non" full_policy="drop head" tail_buffer="100k" persistent="false"/>
     <queue id="drop.tail.non" full_policy="drop tail" tail_buffer="100k" persistent="false"/>
 
-    <queue id="nacker.**" dlq="dlq.*" nak_limit="2"/>
-    <queue id="mirrored.**" mirrored="true"/>
-
     <leveldb_store directory="${testdatadir}"/>
   </virtual_host>
 
   <!--<web_admin bind="http://0.0.0.0:61680"/>-->
   <connector id="tcp" bind="tcp://0.0.0.0:0"/>
+  <connector id="udp" bind="udp://0.0.0.0:0" protocol="udp"/>
 
 </broker>
\ No newline at end of file

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala?rev=1366431&r1=1366430&r2=1366431&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala Fri Jul 27 16:04:01 2012
@@ -17,17 +17,20 @@
 package org.apache.activemq.apollo.stomp
 
 import org.scalatest.matchers.ShouldMatchers
-import org.scalatest.BeforeAndAfterEach
+import org.scalatest._
 import java.lang.String
 import java.util.concurrent.TimeUnit._
+import scala.Some
 import org.apache.activemq.apollo.util._
 import java.util.concurrent.atomic.AtomicLong
 import FileSupport._
 import java.nio.channels.DatagramChannel
 import org.fusesource.hawtbuf.AsciiBuffer
 import org.apache.activemq.apollo.broker._
-import org.apache.activemq.apollo.dto.{TopicStatusDTO, KeyStorageDTO}
+import org.apache.activemq.apollo.dto.KeyStorageDTO
 import java.net.{SocketTimeoutException, InetSocketAddress}
+import org.junit.runner.RunWith
+import scala.Some
 
 class StompTestSupport extends BrokerFunSuiteSupport with ShouldMatchers with BeforeAndAfterEach {
 
@@ -179,7 +182,8 @@ class StompTestSupport extends BrokerFun
 
 /**
  * These test cases check to make sure the broker stats are consistent with what
- * would be expected.
+ * would be expected.  These tests can't be run in parallell since they look at
+ * agreggate destination metrics.
  */
 class StompMetricsTest extends StompTestSupport {
 
@@ -423,16 +427,328 @@ class StompMetricsTest extends StompTest
 
 }
 
+class StompSslTest extends StompTestSupport with BrokerParallelTestExecution {
 
-class Stomp10ConnectTest extends StompTestSupport {
+  override def broker_config_uri: String = "xml:classpath:apollo-stomp-ssl.xml"
 
-  test("Stomp 1.0 CONNECT") {
-    connect("1.0")
+  val config = new KeyStorageDTO
+  config.file = basedir/"src"/"test"/"resources"/"client.ks"
+  config.password = "password"
+  config.key_password = "password"
+
+  client.key_storeage = new KeyStorage(config)
+
+  test("Connect over SSL") {
+    connect("1.1")
+  }
+}
+
+/**
+ * These tests seem to have trouble being run in Parallel
+ */
+class StompSerialTest extends StompTestSupport with BrokerParallelTestExecution {
+
+  // This is the test case for https://issues.apache.org/jira/browse/APLO-88
+  test("ACK then socket close with/without DISCONNECT, should still ACK") {
+    for(i <- 1 until 3) {
+      connect("1.1")
+
+      def send(id:Int) = {
+        client.write(
+          "SEND\n" +
+          "destination:/queue/from-seq-end\n" +
+          "message-id:id-"+i+"-"+id+"\n"+
+          "receipt:0\n"+
+          "\n")
+        wait_for_receipt("0")
+      }
+
+      def get(seq:Long) = {
+        val frame = client.receive()
+        frame should startWith("MESSAGE\n")
+        frame should include("message-id:id-"+i+"-"+seq+"\n")
+        client.write(
+          "ACK\n" +
+          "subscription:0\n" +
+          "message-id:id-"+i+"-"+seq+"\n" +
+          "\n")
+      }
+
+      send(1)
+      send(2)
+
+      client.write(
+        "SUBSCRIBE\n" +
+        "destination:/queue/from-seq-end\n" +
+        "id:0\n" +
+        "ack:client\n"+
+        "\n")
+      get(1)
+      client.write(
+        "DISCONNECT\n" +
+        "\n")
+      client.close
+
+      connect("1.1")
+      client.write(
+        "SUBSCRIBE\n" +
+        "destination:/queue/from-seq-end\n" +
+        "id:0\n" +
+        "ack:client\n"+
+        "\n")
+      get(2)
+      client.close
+    }
+  }
+
+}
+class StompSecurityTest extends StompTestSupport {
+
+  override def broker_config_uri: String = "xml:classpath:apollo-stomp-secure.xml"
+
+  override def is_parallel_test_class: Boolean = false
+
+  override def beforeAll = {
+    try {
+      println("before: "+testName)
+      val login_file = new java.io.File(getClass.getClassLoader.getResource("login.config").getFile())
+      System.setProperty("java.security.auth.login.config", login_file.getCanonicalPath)
+    } catch {
+      case x:Throwable => x.printStackTrace
+    }
+    super.beforeAll
+  }
+
+  test("Connect with valid id password but can't connect") {
+
+    val frame = connect_request("1.1", client,
+      "login:can_not_connect\n" +
+      "passcode:can_not_connect\n")
+    frame should startWith("ERROR\n")
+    frame should include("message:Not authorized to connect")
+
+  }
+
+  test("Connect with no id password") {
+    val frame = connect_request("1.1", client)
+    frame should startWith("ERROR\n")
+    frame should include("message:Authentication failed.")
+  }
+
+  test("Connect with invalid id password") {
+    val frame = connect_request("1.1", client,
+      "login:foo\n" +
+      "passcode:bar\n")
+    frame should startWith("ERROR\n")
+    frame should include("message:Authentication failed.")
+
+  }
+
+  test("Connect with valid id password that can connect") {
+    connect("1.1", client,
+      "login:can_only_connect\n" +
+      "passcode:can_only_connect\n")
+
+  }
+
+  test("Connector restricted user on the right connector") {
+    connect("1.1", client,
+      "login:connector_restricted\n" +
+      "passcode:connector_restricted\n", "tcp2")
+  }
+
+  test("Connector restricted user on the wrong connector") {
+    val frame = connect_request("1.1", client,
+      "login:connector_restricted\n" +
+      "passcode:connector_restricted\n", "tcp")
+    frame should startWith("ERROR\n")
+    frame should include("message:Not authorized to connect to connector 'tcp'.")
+  }
+
+  test("Send not authorized") {
+    connect("1.1", client,
+      "login:can_only_connect\n" +
+      "passcode:can_only_connect\n")
+
+    client.write(
+      "SEND\n" +
+      "destination:/queue/secure\n" +
+      "receipt:0\n" +
+      "\n" +
+      "Hello Wolrd\n")
+
+    val frame = client.receive()
+    frame should startWith("ERROR\n")
+    frame should include("message:Not authorized to create the queue")
+  }
+
+  test("Send authorized but not create") {
+    connect("1.1", client,
+      "login:can_send_queue\n" +
+      "passcode:can_send_queue\n")
+
+    client.write(
+      "SEND\n" +
+      "destination:/queue/secure\n" +
+      "receipt:0\n" +
+      "\n" +
+      "Hello Wolrd\n")
+
+    val frame = client.receive()
+    frame should startWith("ERROR\n")
+    frame should include("message:Not authorized to create the queue")
+
+  }
+
+  test("Consume authorized but not create") {
+    connect("1.1", client,
+      "login:can_consume_queue\n" +
+      "passcode:can_consume_queue\n")
+
+    client.write(
+      "SUBSCRIBE\n" +
+      "destination:/queue/secure\n" +
+      "id:0\n" +
+      "receipt:0\n" +
+      "\n")
+
+    val frame = client.receive()
+    frame should startWith("ERROR\n")
+    frame should include("message:Not authorized to create the queue")
+  }
+
+  test("Send and create authorized") {
+    connect("1.1", client,
+      "login:can_send_create_queue\n" +
+      "passcode:can_send_create_queue\n")
+
+    client.write(
+      "SEND\n" +
+      "destination:/queue/secure\n" +
+      "receipt:0\n" +
+      "\n" +
+      "Hello Wolrd\n")
+
+    wait_for_receipt("0")
+
+  }
+
+  test("Send and create authorized via id_regex") {
+    connect("1.1", client,
+      "login:guest\n" +
+      "passcode:guest\n")
+
+    client.write(
+      "SEND\n" +
+      "destination:/queue/testblah\n" +
+      "receipt:0\n" +
+      "\n" +
+      "Hello Wolrd\n")
+
+    wait_for_receipt("0")
+
+    client.write(
+      "SEND\n" +
+      "destination:/queue/notmatch\n" +
+      "receipt:1\n" +
+      "\n" +
+      "Hello Wolrd\n")
+
+    val frame = client.receive()
+    frame should startWith("ERROR\n")
+    frame should include("message:Not authorized to create the queue")
+  }
+
+  test("Can send and once created") {
+
+    // Now try sending with the lower access id.
+    connect("1.1", client,
+      "login:can_send_queue\n" +
+      "passcode:can_send_queue\n")
+
+    client.write(
+      "SEND\n" +
+      "destination:/queue/secure\n" +
+      "receipt:0\n" +
+      "\n" +
+      "Hello Wolrd\n")
+
+    wait_for_receipt("0")
+
+  }
+
+  test("Consume not authorized") {
+    connect("1.1", client,
+      "login:can_only_connect\n" +
+      "passcode:can_only_connect\n")
+
+    client.write(
+      "SUBSCRIBE\n" +
+      "destination:/queue/secure\n" +
+      "id:0\n" +
+      "receipt:0\n" +
+      "\n")
+
+    val frame = client.receive()
+    frame should startWith("ERROR\n")
+    frame should include("message:Not authorized to consume from the queue")
+  }
+
+  test("Consume authorized and JMSXUserID is set on message") {
+    connect("1.1", client,
+      "login:can_send_create_consume_queue\n" +
+      "passcode:can_send_create_consume_queue\n")
+
+    subscribe("0","/queue/sendsid")
+    async_send("/queue/sendsid", "hello")
+
+    val frame = client.receive()
+    frame should startWith("MESSAGE\n")
+    frame should include("JMSXUserID:can_send_create_consume_queue\n")
+    frame should include("sender-ip:127.0.0.1\n")
+  }
+}
+class StompSslSecurityTest extends StompTestSupport {
+
+  override def broker_config_uri: String = "xml:classpath:apollo-stomp-ssl-secure.xml"
+  override def is_parallel_test_class: Boolean = false
+
+  override def beforeAll = {
+    // System.setProperty("javax.net.debug", "all")
+    try {
+      val login_file = new java.io.File(getClass.getClassLoader.getResource("login.config").getFile())
+      System.setProperty("java.security.auth.login.config", login_file.getCanonicalPath)
+    } catch {
+      case x:Throwable => x.printStackTrace
+    }
+    super.beforeAll
+  }
+
+  def use_client_cert = {
+    val config = new KeyStorageDTO
+    config.file = basedir/"src"/"test"/"resources"/"client.ks"
+    config.password = "password"
+    config.key_password = "password"
+    client.key_storeage = new KeyStorage(config)
+  }
+
+  test("Connect with cert and no id password") {
+    use_client_cert
+    connect("1.1", client)
   }
 
 }
 
-class Stomp11ConnectTest extends StompTestSupport {
+/**
+ * These tests can be run in parallel against a single Apollo broker.
+ */
+class StompParallelTest extends StompTestSupport with BrokerParallelTestExecution {
+
+  def skip_if_using_store = skip(broker_config_uri.endsWith("-bdb.xml") || broker_config_uri.endsWith("-leveldb.xml"))
+
+  test("Stomp 1.0 CONNECT") {
+    connect("1.0")
+  }
 
   test("Stomp 1.1 CONNECT") {
     connect("1.1")
@@ -497,10 +813,6 @@ class Stomp11ConnectTest extends StompTe
     frame should include regex("""message:.+?\n""")
   }
 
-}
-
-class Stomp11HeartBeatTest extends StompTestSupport {
-
   test("Stomp 1.1 Broker sends heart-beat") {
 
     client.open("localhost", port)
@@ -559,54 +871,24 @@ class Stomp11HeartBeatTest extends Stomp
     }
   }
 
-}
-
-class StompPersistentQueueTest extends StompTestSupport {
-
-  override def broker_config_uri: String = "xml:classpath:apollo-stomp-leveldb.xml"
-
-  test("(APLO-198) Apollo sometimes does not send all the messages in a queue") {
-    connect("1.1")
-    for( i <- 0 until 10000 ) {
-      async_send("/queue/BIGQUEUE", "message #"+i)
-    }
-    sync_send("/queue/BIGQUEUE", "END")
-    client.close
-    
-    var counter = 0
-    for( i <- 0 until 100 ) {
-      connect("1.1")
-      subscribe("1", "/queue/BIGQUEUE", "client", false, "", false)
-      for( j <- 0 until 100 ) {
-        assert_received("message #"+counter)(true)
-        counter+=1
-      }
-      client.write(
-        "DISCONNECT\n" +
-        "receipt:disco\n" +
-        "\n")
-      wait_for_receipt("disco", client, true)
-      client.close
-      within(2, SECONDS) {
-        val status = queue_status("BIGQUEUE")
-        status.consumers.size() should be(0)
-      }
-    }
+  test("UDP to STOMP interop") {
 
     connect("1.1")
-    subscribe("1", "/queue/BIGQUEUE", "client")
-    assert_received("END")(true)
+    subscribe("0", "/topic/udp")
 
-  }
+    val udp_port:Int = connector_port("udp").get
+    val channel = DatagramChannel.open();
 
-}
+    val target = new InetSocketAddress("127.0.0.1", udp_port)
+    channel.send(new AsciiBuffer("Hello").toByteBuffer, target)
 
-/**
- * These disconnect tests assure that we don't drop message deliviers that are in flight
- * if a client disconnects before those deliveries are accepted by the target destination.
- */
-class StompDisconnectTest extends StompTestSupport {
+    assert_received("Hello")
+  }
 
+  /**
+   * These disconnect tests assure that we don't drop message deliviers that are in flight
+   * if a client disconnects before those deliveries are accepted by the target destination.
+   */
   test("Messages delivery assured to a queued once a disconnect receipt is received") {
 
     // figure out at what point a quota'ed queue stops accepting more messages.
@@ -685,9 +967,6 @@ class StompDisconnectTest extends StompT
     }
 
   }
-}
-
-class StompDestinationTest extends StompTestSupport {
 
   test("APLO-206 - Load balance of job queues using small consumer credit windows") {
     connect("1.1")
@@ -711,6 +990,7 @@ class StompDestinationTest extends Stomp
   }
 
   test("Browsing queues does not cause AssertionError.  Reported in APLO-156") {
+    skip_if_using_store
     connect("1.1")
     subscribe("0", "/queue/TOOL.DEFAULT")
     async_send("/queue/TOOL.DEFAULT", "1")
@@ -734,74 +1014,22 @@ class StompDestinationTest extends Stomp
     sync_send("/topic/retained-example", 3)
     subscribe("0", "/topic/retained-example")
     assert_received(2)
-    async_send("/topic/retained-example", 4)
-    assert_received(4)
-  }
-
-  test("retain:remove makes a topic forget the message") {
-    connect("1.1")
-    async_send("/topic/retained-example2", 1)
-    async_send("/topic/retained-example2", 2, "retain:set\n")
-    async_send("/topic/retained-example2", 3, "retain:remove\n")
-    subscribe("0", "/topic/retained-example2")
-    async_send("/topic/retained-example2", 4)
-    assert_received(4)
-  }
-
-  // This is the test case for https://issues.apache.org/jira/browse/APLO-88
-  test("ACK then socket close with/without DISCONNECT, should still ACK") {
-    for(i <- 1 until 3) {
-      connect("1.1")
-
-      def send(id:Int) = {
-        client.write(
-          "SEND\n" +
-          "destination:/queue/from-seq-end\n" +
-          "message-id:id-"+i+"-"+id+"\n"+
-          "receipt:0\n"+
-          "\n")
-        wait_for_receipt("0")
-      }
-
-      def get(seq:Long) = {
-        val frame = client.receive()
-        frame should startWith("MESSAGE\n")
-        frame should include("message-id:id-"+i+"-"+seq+"\n")
-        client.write(
-          "ACK\n" +
-          "subscription:0\n" +
-          "message-id:id-"+i+"-"+seq+"\n" +
-          "\n")
-      }
-
-      send(1)
-      send(2)
-
-      client.write(
-        "SUBSCRIBE\n" +
-        "destination:/queue/from-seq-end\n" +
-        "id:0\n" +
-        "ack:client\n"+
-        "\n")
-      get(1)
-      client.write(
-        "DISCONNECT\n" +
-        "\n")
-      client.close
-
-      connect("1.1")
-      client.write(
-        "SUBSCRIBE\n" +
-        "destination:/queue/from-seq-end\n" +
-        "id:0\n" +
-        "ack:client\n"+
-        "\n")
-      get(2)
-      client.close
-    }
+    async_send("/topic/retained-example", 4)
+    assert_received(4)
+  }
+
+  test("retain:remove makes a topic forget the message") {
+    connect("1.1")
+    async_send("/topic/retained-example2", 1)
+    async_send("/topic/retained-example2", 2, "retain:set\n")
+    async_send("/topic/retained-example2", 3, "retain:remove\n")
+    subscribe("0", "/topic/retained-example2")
+    async_send("/topic/retained-example2", 4)
+    assert_received(4)
   }
 
   test("Setting `from-seq` header to -1 results in subscription starting at end of the queue.") {
+    skip_if_using_store
     connect("1.1")
 
     def send(id:Int) = {
@@ -899,6 +1127,7 @@ class StompDestinationTest extends Stomp
   }
 
   test("The `from-seq` header can be used to resume delivery from a given point in a queue.") {
+    skip_if_using_store
     connect("1.1")
 
     def send(id:Int) = {
@@ -1120,6 +1349,7 @@ class StompDestinationTest extends Stomp
   }
 
   test("Queue browsers don't consume the messages") {
+    skip_if_using_store
     connect("1.1")
 
     def put(id:Int) = {
@@ -1212,7 +1442,7 @@ class StompDestinationTest extends Stomp
     def put(id:Int) = {
       client.write(
         "SEND\n" +
-        "destination:/topic/updates\n" +
+        "destination:/topic/updates1\n" +
         "\n" +
         "message:"+id+"\n")
     }
@@ -1220,7 +1450,7 @@ class StompDestinationTest extends Stomp
 
     client.write(
       "SUBSCRIBE\n" +
-      "destination:/topic/updates\n" +
+      "destination:/topic/updates1\n" +
       "id:0\n" +
       "receipt:0\n" +
       "\n")
@@ -1247,366 +1477,113 @@ class StompDestinationTest extends Stomp
     def put(id:Int) = {
       client.write(
         "SEND\n" +
-        "destination:/topic/updates\n" +
+        "destination:/topic/updates2\n" +
         "\n" +
         "message:"+id+"\n")
     }
 
     client.write(
       "SUBSCRIBE\n" +
-      "destination:/topic/updates\n" +
-      "id:my-sub-name\n" +
-      "persistent:true\n" +
-      "receipt:0\n" +
-      "\n")
-    wait_for_receipt("0")
-    client.close
-
-    // Close him out.. since persistent:true then
-    // the topic subscription will be persistent accross client
-    // connections.
-
-    connect("1.1")
-    put(1)
-    put(2)
-    put(3)
-
-    client.write(
-      "SUBSCRIBE\n" +
-      "destination:/topic/updates\n" +
+      "destination:/topic/updates2\n" +
       "id:my-sub-name\n" +
       "persistent:true\n" +
-      "\n")
-
-    def get(id:Int) = {
-      val frame = client.receive()
-      frame should startWith("MESSAGE\n")
-      frame should include ("subscription:my-sub-name\n")
-      frame should endWith regex("\n\nmessage:"+id+"\n")
-    }
-
-    get(1)
-    get(2)
-    get(3)
-  }
-
-  test("Queue and a selector") {
-    connect("1.1")
-
-    def put(id:Int, color:String) = {
-      client.write(
-        "SEND\n" +
-        "destination:/queue/selected\n" +
-        "color:"+color+"\n" +
-        "\n" +
-        "message:"+id+"\n")
-    }
-    put(1, "red")
-    put(2, "blue")
-    put(3, "red")
-
-    client.write(
-      "SUBSCRIBE\n" +
-      "destination:/queue/selected\n" +
-      "selector:color='red'\n" +
-      "id:0\n" +
-      "\n")
-
-    def get(id:Int) = {
-      val frame = client.receive()
-      frame should startWith("MESSAGE\n")
-      frame should endWith regex("\n\nmessage:"+id+"\n")
-    }
-    get(1)
-    get(3)
-  }
-
-  test("Topic and a selector") {
-    connect("1.1")
-
-    def put(id:Int, color:String) = {
-      client.write(
-        "SEND\n" +
-        "destination:/topic/selected\n" +
-        "color:"+color+"\n" +
-        "\n" +
-        "message:"+id+"\n")
-    }
-
-    client.write(
-      "SUBSCRIBE\n" +
-      "destination:/topic/selected\n" +
-      "selector:color='red'\n" +
-      "id:0\n" +
-      "receipt:0\n" +
-      "\n")
-    wait_for_receipt("0")
-
-    put(1, "red")
-    put(2, "blue")
-    put(3, "red")
-
-    def get(id:Int) = {
-      val frame = client.receive()
-      frame should startWith("MESSAGE\n")
-      frame should endWith regex("\n\nmessage:"+id+"\n")
-    }
-    get(1)
-    get(3)
-  }
-
-
-}
-
-class DurableSubscriptionOnLevelDBTest extends StompTestSupport {
-
-  override def broker_config_uri: String = "xml:classpath:apollo-stomp-leveldb.xml"
-
-  test("Multiple dsubs contain the same messages (Test case for APLO-210)") {
-
-    val sub_count = 3
-    val message_count = 1000
-
-    // establish 3 durable subs..
-    connect("1.1")
-    for( sub <- 1 to sub_count ) {
-      subscribe(id="sub"+sub, dest="/topic/sometopic", persistent=true)
-    }
-    close()
-
-    connect("1.1")
-
-    val filler = ":"+("x"*(1024*10))
-
-    // Now send a bunch of messages....
-    for( i <- 1 to message_count ) {
-      async_send(dest="/topic/sometopic", headers="persistent:true\n", body=i+filler)
-    }
-
-    // Empty out the durable durable sub
-    for( sub <- 1 to sub_count ) {
-      subscribe(id="sub"+sub, dest="/topic/sometopic", persistent=true, sync=false)
-      for( i <- 1 to message_count ) {
-        assert_received(body=i+filler, sub="sub"+sub)
-      }
-    }
-
-  }
-
-  test("Can directly send an recieve from a durable sub") {
-    connect("1.1")
-
-    // establish 2 durable subs..
-    client.write(
-      "SUBSCRIBE\n" +
-      "destination:/topic/sometopic\n" +
-      "id:sub1\n" +
-      "persistent:true\n" +
-      "receipt:0\n" +
-      "\n")
-    wait_for_receipt("0")
-
-    client.write(
-      "SUBSCRIBE\n" +
-      "destination:/topic/sometopic\n" +
-      "id:sub2\n" +
-      "persistent:true\n" +
-      "receipt:0\n" +
-      "\n")
-    wait_for_receipt("0")
-
-    client.close
-    connect("1.1")
-
-    // Now send a bunch of messages....
-    // Send only to sub 1
-    client.write(
-      "SEND\n" +
-      "destination:/dsub/sub1\n" +
-      "\n" +
-      "sub1 msg\n")
-
-    // Send to all subs
-    client.write(
-      "SEND\n" +
-      "destination:/topic/sometopic\n" +
-      "\n" +
-      "LAST\n")
-
-
-    // Now try to get all the previously sent messages.
-    def get(expected:String) = {
-      val frame = client.receive()
-      frame should startWith("MESSAGE\n")
-      frame should endWith("\n\n"+expected)
-    }
-
-    // Empty out the first durable sub
-    client.write(
-      "SUBSCRIBE\n" +
-      "destination:/dsub/sub1\n" +
-      "id:1\n" +
-      "\n")
-
-    get("sub1 msg\n")
-    get("LAST\n")
-
-    // Empty out the 2nd durable sub
-    client.write(
-      "SUBSCRIBE\n" +
-      "destination:/dsub/sub2\n" +
-      "id:2\n" +
-      "\n")
-
-    get("LAST\n")
-  }
-  test("You can connect and then unsubscribe from existing durable sub (APLO-157)") {
-    connect("1.1")
-    subscribe("APLO-157", "/topic/APLO-157", "auto", true)
-    client.close()
-
-    // Make sure the durable sub exists.
-    connect("1.1")
-    sync_send("/topic/APLO-157", "1")
-    subscribe("APLO-157", "/topic/APLO-157", "client", true)
-    assert_received("1")
-    client.close()
-
-    // Delete the durable sub..
-    connect("1.1")
-    unsubscribe("APLO-157", "persistent:true\n")
-    client.close()
-
-    // Make sure the durable sub does not exists.
-    connect("1.1")
-    subscribe("APLO-157", "/topic/APLO-157", "client", true)
-    async_send("/topic/APLO-157", "2")
-    assert_received("2")
-    unsubscribe("APLO-157", "persistent:true\n")
-
-  }
-
-  test("Can create dsubs with dots in them") {
-    connect("1.1")
-
-    client.write(
-      "SUBSCRIBE\n" +
-      "destination:/topic/sometopic\n" +
-      "id:sub.1\n" +
-      "persistent:true\n" +
-      "receipt:0\n" +
-      "\n")
-    wait_for_receipt("0")
-
-    client.write(
-      "SEND\n" +
-      "destination:/dsub/sub.1\n" +
       "receipt:0\n" +
-      "\n" +
-      "content\n")
+      "\n")
     wait_for_receipt("0")
+    client.close
 
-  }
+    // Close him out.. since persistent:true then
+    // the topic subscription will be persistent accross client
+    // connections.
 
-  test("Duplicate SUBSCRIBE updates durable subscription bindings") {
     connect("1.1")
+    put(1)
+    put(2)
+    put(3)
 
     client.write(
       "SUBSCRIBE\n" +
-      "destination:/topic/a\n" +
-      "id:sub1\n" +
+      "destination:/topic/updates2\n" +
+      "id:my-sub-name\n" +
       "persistent:true\n" +
-      "receipt:0\n" +
       "\n")
-    wait_for_receipt("0")
 
-    def get(expected:String) = {
+    def get(id:Int) = {
       val frame = client.receive()
       frame should startWith("MESSAGE\n")
-      frame should endWith("\n\n"+expected)
+      frame should include ("subscription:my-sub-name\n")
+      frame should endWith regex("\n\nmessage:"+id+"\n")
     }
 
-    // Validate that the durable sub is bound to /topic/a
-    client.write(
-      "SEND\n" +
-      "destination:/topic/a\n" +
-      "\n" +
-      "1\n")
-    get("1\n")
-
-    client.write(
-      "UNSUBSCRIBE\n" +
-      "id:sub1\n" +
-      "receipt:0\n" +
-      "\n")
-    wait_for_receipt("0")
+    get(1)
+    get(2)
+    get(3)
+  }
 
-    // Switch the durable sub to /topic/b
-    client.write(
-      "SUBSCRIBE\n" +
-      "destination:/topic/b\n" +
-      "id:sub1\n" +
-      "persistent:true\n" +
-      "receipt:0\n" +
-      "\n")
-    wait_for_receipt("0")
+  test("Queue and a selector") {
+    connect("1.1")
 
-    // all these should get dropped
-    for ( i <- 1 to 500 ) {
+    def put(id:Int, color:String) = {
       client.write(
         "SEND\n" +
-        "destination:/topic/a\n" +
+        "destination:/queue/selected\n" +
+        "color:"+color+"\n" +
         "\n" +
-        "DROPPED\n")
+        "message:"+id+"\n")
     }
+    put(1, "red")
+    put(2, "blue")
+    put(3, "red")
 
-    // Not this one.. it's on the updated topic
     client.write(
-      "SEND\n" +
-      "destination:/topic/b\n" +
-      "\n" +
-      "2\n")
-    get("2\n")
+      "SUBSCRIBE\n" +
+      "destination:/queue/selected\n" +
+      "selector:color='red'\n" +
+      "id:0\n" +
+      "\n")
 
+    def get(id:Int) = {
+      val frame = client.receive()
+      frame should startWith("MESSAGE\n")
+      frame should endWith regex("\n\nmessage:"+id+"\n")
+    }
+    get(1)
+    get(3)
   }
 
-  test("Direct send to a non-existant a durable sub fails") {
+  test("Topic and a selector") {
     connect("1.1")
 
-    client.write(
-      "SEND\n" +
-      "destination:/dsub/doesnotexist\n" +
-      "receipt:0\n" +
-      "\n" +
-      "content\n")
-
-    val frame = client.receive()
-    frame should startWith("ERROR\n")
-    frame should include("message:The destination does not exist")
-  }
-
-  test("Direct subscribe to a non-existant a durable sub fails") {
-    connect("1.1")
+    def put(id:Int, color:String) = {
+      client.write(
+        "SEND\n" +
+        "destination:/topic/selected\n" +
+        "color:"+color+"\n" +
+        "\n" +
+        "message:"+id+"\n")
+    }
 
     client.write(
       "SUBSCRIBE\n" +
-      "destination:/dsub/doesnotexist\n" +
-      "id:1\n" +
+      "destination:/topic/selected\n" +
+      "selector:color='red'\n" +
+      "id:0\n" +
       "receipt:0\n" +
       "\n")
+    wait_for_receipt("0")
 
-    val frame = client.receive()
-    frame should startWith("ERROR\n")
-    frame should include("message:Durable subscription does not exist")
+    put(1, "red")
+    put(2, "blue")
+    put(3, "red")
 
+    def get(id:Int) = {
+      val frame = client.receive()
+      frame should startWith("MESSAGE\n")
+      frame should endWith regex("\n\nmessage:"+id+"\n")
+    }
+    get(1)
+    get(3)
   }
-}
-
-class DurableSubscriptionOnBDBTest extends DurableSubscriptionOnLevelDBTest {
-  override def broker_config_uri: String = "xml:classpath:apollo-stomp-bdb.xml"
-}
-
-class StompMirroredQueueTest extends StompTestSupport {
 
   test("Topic gets copy of message sent to queue") {
     connect("1.1")
@@ -1678,22 +1655,45 @@ class StompMirroredQueueTest extends Sto
     get(2)
   }
 
+  def path_separator = "."
 
-}
+  test("Messages Expire") {
+    connect("1.1")
 
-class StompSslDestinationTest extends StompDestinationTest {
-  override def broker_config_uri: String = "xml:classpath:apollo-stomp-ssl.xml"
+    def put(msg:String, ttl:Option[Long]=None) = {
+      val expires_header = ttl.map(t=> "expires:"+(System.currentTimeMillis()+t)+"\n").getOrElse("")
+      client.write(
+        "SEND\n" +
+        expires_header +
+        "destination:/queue/exp\n" +
+        "\n" +
+        "message:"+msg+"\n")
+    }
 
-  val config = new KeyStorageDTO
-  config.file = basedir/"src"/"test"/"resources"/"client.ks"
-  config.password = "password"
-  config.key_password = "password"
+    put("1")
+    put("2", Some(1000L))
+    put("3")
 
-  client.key_storeage = new KeyStorage(config)
+    Thread.sleep(2000)
 
-}
+    client.write(
+      "SUBSCRIBE\n" +
+      "destination:/queue/exp\n" +
+      "id:1\n" +
+      "receipt:0\n"+
+      "\n")
+    wait_for_receipt("0")
+
+
+    def get(dest:String) = {
+      val frame = client.receive()
+      frame should startWith("MESSAGE\n")
+      frame should endWith("\n\nmessage:%s\n".format(dest))
+    }
 
-class StompReceiptTest extends StompTestSupport {
+    get("1")
+    get("3")
+  }
 
   test("Receipts on SEND to unconsummed topic") {
     connect("1.1")
@@ -1742,11 +1742,9 @@ class StompReceiptTest extends StompTest
     put(2)
     wait_for_receipt("1")
     wait_for_receipt("2")
-    
+
   }
-}
-class StompTransactionTest extends StompTestSupport {
-  
+
   test("Transacted commit after unsubscribe"){
     val producer = new StompClient
     val consumer = new StompClient
@@ -1872,11 +1870,6 @@ class StompTransactionTest extends Stomp
 
   }
 
-}
-
-
-class StompAckModeTest extends StompTestSupport {
-
   test("ack:client redelivers on client disconnect") {
     connect("1.1")
 
@@ -1936,8 +1929,8 @@ class StompAckModeTest extends StompTest
       "id:0\n" +
       "\n")
     get(3)
-    
-    
+
+
   }
 
 
@@ -2002,593 +1995,526 @@ class StompAckModeTest extends StompTest
     get(1)
     get(3)
 
-
   }
 
-}
-
-class StompSecurityTest extends StompTestSupport {
-
-  override def broker_config_uri: String = "xml:classpath:apollo-stomp-secure.xml"
+  test("Temp Queue Send Receive") {
+    connect("1.1")
 
-  override def beforeAll = {
-    try {
-      val login_file = new java.io.File(getClass.getClassLoader.getResource("login.config").getFile())
-      System.setProperty("java.security.auth.login.config", login_file.getCanonicalPath)
-    } catch {
-      case x:Throwable => x.printStackTrace
+    def put(msg:String) = {
+      client.write(
+        "SEND\n" +
+        "destination:/temp-queue/test\n" +
+        "reply-to:/temp-queue/test\n" +
+        "receipt:0\n" +
+        "\n" +
+        "message:"+msg+"\n")
+      wait_for_receipt("0")
     }
-    super.beforeAll
-  }
-
-  test("Connect with valid id password but can't connect") {
-
-    val frame = connect_request("1.1", client,
-      "login:can_not_connect\n" +
-      "passcode:can_not_connect\n")
-    frame should startWith("ERROR\n")
-    frame should include("message:Not authorized to connect")
-
-  }
-
-  test("Connect with no id password") {
-    val frame = connect_request("1.1", client)
-    frame should startWith("ERROR\n")
-    frame should include("message:Authentication failed.")
-  }
-
-  test("Connect with invalid id password") {
-    val frame = connect_request("1.1", client,
-      "login:foo\n" +
-      "passcode:bar\n")
-    frame should startWith("ERROR\n")
-    frame should include("message:Authentication failed.")
-
-  }
-
-  test("Connect with valid id password that can connect") {
-    connect("1.1", client,
-      "login:can_only_connect\n" +
-      "passcode:can_only_connect\n")
-
-  }
-
-  test("Connector restricted user on the right connector") {
-    connect("1.1", client,
-      "login:connector_restricted\n" +
-      "passcode:connector_restricted\n", "tcp2")
-  }
-
-  test("Connector restricted user on the wrong connector") {
-    val frame = connect_request("1.1", client,
-      "login:connector_restricted\n" +
-      "passcode:connector_restricted\n", "tcp")
-    frame should startWith("ERROR\n")
-    frame should include("message:Not authorized to connect to connector 'tcp'.")
-  }
-
-  test("Send not authorized") {
-    connect("1.1", client,
-      "login:can_only_connect\n" +
-      "passcode:can_only_connect\n")
+
+    put("1")
 
     client.write(
-      "SEND\n" +
-      "destination:/queue/secure\n" +
-      "receipt:0\n" +
-      "\n" +
-      "Hello Wolrd\n")
+      "SUBSCRIBE\n" +
+      "destination:/temp-queue/test\n" +
+      "id:1\n" +
+      "\n")
 
-    val frame = client.receive()
-    frame should startWith("ERROR\n")
-    frame should include("message:Not authorized to create the queue")
-  }
+    def get(dest:String) = {
+      val frame = client.receive()
+      frame should startWith("MESSAGE\n")
+      frame should endWith("\n\nmessage:%s\n".format(dest))
 
-  test("Send authorized but not create") {
-    connect("1.1", client,
-      "login:can_send_queue\n" +
-      "passcode:can_send_queue\n")
+      // extract headers as a map of values.
+      Map((frame.split("\n").reverse.flatMap { line =>
+        if( line.contains(":") ) {
+          val parts = line.split(":", 2)
+          Some((parts(0), parts(1)))
+        } else {
+          None
+        }
+      }):_*)
+    }
 
-    client.write(
+    // The destination and reply-to headers should get updated with actual
+    // Queue names
+    val message = get("1")
+    val actual_temp_dest_name = message.get("destination").get
+    actual_temp_dest_name should startWith("/queue/temp.default.")
+    message.get("reply-to") should be === ( message.get("destination") )
+
+    // Different connection should be able to send a message to the temp destination..
+    var other = new StompClient
+    connect("1.1", other)
+    other.write(
       "SEND\n" +
-      "destination:/queue/secure\n" +
+      "destination:"+actual_temp_dest_name+"\n" +
       "receipt:0\n" +
-      "\n" +
-      "Hello Wolrd\n")
+      "\n")
+    wait_for_receipt("0", other)
 
-    val frame = client.receive()
+    // First client chould get the message.
+    var frame = client.receive()
+    frame should startWith("MESSAGE\n")
+
+    // But not consume from it.
+    other.write(
+      "SUBSCRIBE\n" +
+      "destination:"+actual_temp_dest_name+"\n" +
+      "id:1\n" +
+      "receipt:0\n" +
+      "\n")
+    frame = other.receive()
     frame should startWith("ERROR\n")
-    frame should include("message:Not authorized to create the queue")
+    frame should include regex("""message:Not authorized to receive from the temporary destination""")
+    other.close()
+
+    // Check that temp queue is deleted once the client disconnects
+    put("2")
+    expect(true)(queue_exists(actual_temp_dest_name.stripPrefix("/queue/")))
+    client.close();
 
+    within(10, SECONDS) {
+      expect(false)(queue_exists(actual_temp_dest_name.stripPrefix("/queue/")))
+    }
   }
 
-  test("Consume authorized but not create") {
-    connect("1.1", client,
-      "login:can_consume_queue\n" +
-      "passcode:can_consume_queue\n")
+  test("Temp Topic Send Receive") {
+    connect("1.1")
 
     client.write(
       "SUBSCRIBE\n" +
-      "destination:/queue/secure\n" +
-      "id:0\n" +
-      "receipt:0\n" +
+      "destination:/temp-topic/test\n" +
+      "id:1\n" +
       "\n")
 
-    val frame = client.receive()
-    frame should startWith("ERROR\n")
-    frame should include("message:Not authorized to create the queue")
-  }
+    def get(dest:String) = {
+      val frame = client.receive()
+      frame should startWith("MESSAGE\n")
+      frame should endWith("\n\nmessage:%s\n".format(dest))
 
-  test("Send and create authorized") {
-    connect("1.1", client,
-      "login:can_send_create_queue\n" +
-      "passcode:can_send_create_queue\n")
+      // extract headers as a map of values.
+      Map((frame.split("\n").reverse.flatMap { line =>
+        if( line.contains(":") ) {
+          val parts = line.split(":", 2)
+          Some((parts(0), parts(1)))
+        } else {
+          None
+        }
+      }):_*)
+    }
 
-    client.write(
-      "SEND\n" +
-      "destination:/queue/secure\n" +
-      "receipt:0\n" +
-      "\n" +
-      "Hello Wolrd\n")
+    def put(msg:String) = {
+      client.write(
+        "SEND\n" +
+        "destination:/temp-topic/test\n" +
+        "reply-to:/temp-topic/test\n" +
+        "receipt:0\n" +
+        "\n" +
+        "message:"+msg+"\n")
+      wait_for_receipt("0", client)
+    }
+    put("1")
 
-    wait_for_receipt("0")
+    // The destination and reply-to headers should get updated with actual
+    // Queue names
+    val message = get("1")
+    val actual_temp_dest_name = message.get("destination").get
+    actual_temp_dest_name should startWith("/topic/temp.default.")
+    message.get("reply-to") should be === ( message.get("destination") )
 
-  }
+    // Different connection should be able to send a message to the temp destination..
+    var other = new StompClient
+    connect("1.1", other)
+    other.write(
+      "SEND\n" +
+      "destination:"+actual_temp_dest_name+"\n" +
+      "receipt:0\n" +
+      "\n")
+    wait_for_receipt("0", other)
 
-  test("Send and create authorized via id_regex") {
-    connect("1.1", client,
-      "login:guest\n" +
-      "passcode:guest\n")
+    // First client chould get the message.
+    var frame = client.receive()
+    frame should startWith("MESSAGE\n")
 
-    client.write(
-      "SEND\n" +
-      "destination:/queue/testblah\n" +
+    // But not consume from it.
+    other.write(
+      "SUBSCRIBE\n" +
+      "destination:"+actual_temp_dest_name+"\n" +
+      "id:1\n" +
       "receipt:0\n" +
-      "\n" +
-      "Hello Wolrd\n")
+      "\n")
+    frame = other.receive()
+    frame should startWith("ERROR\n")
+    frame should include regex("""message:Not authorized to receive from the temporary destination""")
+    other.close()
 
-    wait_for_receipt("0")
+    // Check that temp queue is deleted once the client disconnects
+    put("2")
+    expect(true)(topic_exists(actual_temp_dest_name.stripPrefix("/topic/")))
+    client.close();
 
-    client.write(
-      "SEND\n" +
-      "destination:/queue/notmatch\n" +
-      "receipt:1\n" +
-      "\n" +
-      "Hello Wolrd\n")
+    within(10, SECONDS) {
+      expect(false)(topic_exists(actual_temp_dest_name.stripPrefix("/topic/")))
+    }
 
-    val frame = client.receive()
-    frame should startWith("ERROR\n")
-    frame should include("message:Not authorized to create the queue")
-  }
 
-  test("Can send and once created") {
+  }
 
-    // Now try sending with the lower access id.
-    connect("1.1", client,
-      "login:can_send_queue\n" +
-      "passcode:can_send_queue\n")
+  test("Odd reply-to headers do not cause errors") {
+    connect("1.1")
 
     client.write(
       "SEND\n" +
-      "destination:/queue/secure\n" +
+      "destination:/queue/oddrepyto\n" +
+      "reply-to:sms:8139993334444\n" +
       "receipt:0\n" +
-      "\n" +
-      "Hello Wolrd\n")
-
+      "\n")
     wait_for_receipt("0")
 
-  }
-
-  test("Consume not authorized") {
-    connect("1.1", client,
-      "login:can_only_connect\n" +
-      "passcode:can_only_connect\n")
-
     client.write(
       "SUBSCRIBE\n" +
-      "destination:/queue/secure\n" +
-      "id:0\n" +
-      "receipt:0\n" +
+      "destination:/queue/oddrepyto\n" +
+      "id:1\n" +
       "\n")
 
     val frame = client.receive()
-    frame should startWith("ERROR\n")
-    frame should include("message:Not authorized to consume from the queue")
-  }
-
-  test("Consume authorized and JMSXUserID is set on message") {
-    connect("1.1", client,
-      "login:can_send_create_consume_queue\n" +
-      "passcode:can_send_create_consume_queue\n")
-
-    subscribe("0","/queue/sendsid")
-    async_send("/queue/sendsid", "hello")
-
-    val frame = client.receive()
     frame should startWith("MESSAGE\n")
-    frame should include("JMSXUserID:can_send_create_consume_queue\n")
-    frame should include("sender-ip:127.0.0.1\n")
+    frame should include("reply-to:sms:8139993334444\n")
   }
-}
-
-class StompSslSecurityTest extends StompTestSupport {
 
-  override def broker_config_uri: String = "xml:classpath:apollo-stomp-ssl-secure.xml"
+  test("NACKing moves messages to DLQ (non-persistent)") {
+    connect("1.1")
+    sync_send("/queue/nacker.a", "this msg is not persistent")
 
-  override def beforeAll = {
-    // System.setProperty("javax.net.debug", "all")
-    try {
-      val login_file = new java.io.File(getClass.getClassLoader.getResource("login.config").getFile())
-      System.setProperty("java.security.auth.login.config", login_file.getCanonicalPath)
-    } catch {
-      case x:Throwable => x.printStackTrace
-    }
-    super.beforeAll
-  }
+    subscribe("0", "/queue/nacker.a", "client", false, "", false)
+    subscribe("dlq", "/queue/dlq.nacker.a", "auto", false, "", false)
+    var ack = assert_received("this msg is not persistent", "0")
+    ack(false)
+    ack = assert_received("this msg is not persistent", "0")
+    ack(false)
 
-  def use_client_cert = {
-    val config = new KeyStorageDTO
-    config.file = basedir/"src"/"test"/"resources"/"client.ks"
-    config.password = "password"
-    config.key_password = "password"
-    client.key_storeage = new KeyStorage(config)
+    // It should be sent to the DLQ after the 2nd nak
+    assert_received("this msg is not persistent", "dlq")
   }
 
-  test("Connect with cert and no id password") {
-    use_client_cert
-    connect("1.1", client)
+  test("NACKing moves messages to DLQ (persistent)") {
+    connect("1.1")
+    sync_send("/queue/nacker.b", "this msg is persistent", "persistent:true\n")
+
+    subscribe("0", "/queue/nacker.b", "client", false, "", false)
+    subscribe("dlq", "/queue/dlq.nacker.b", "auto", false, "", false)
+    var ack = assert_received("this msg is persistent", "0")
+    ack(false)
+    ack = assert_received("this msg is persistent", "0")
+    ack(false)
+
+    // It should be sent to the DLQ after the 2nd nak
+    assert_received("this msg is persistent", "dlq")
   }
 
-}
+  test("NACKing without DLQ consumer (persistent)"){
+    connect("1.1")
+    sync_send("/queue/nacker.c", "this msg is persistent", "persistent:true\n")
 
-class StompWildcardTest extends StompTestSupport {
+    subscribe("0", "/queue/nacker.c", "client", false, "", false)
 
-  def path_separator = "."
+    var ack = assert_received("this msg is persistent", "0")
+    ack(false)
+    ack = assert_received("this msg is persistent", "0")
+    ack(false)
+    Thread.sleep(1000)
+  }
 
-  test("Wildcard subscription") {
-    connect("1.1")
 
-    client.write(
-      "SUBSCRIBE\n" +
-      "destination:/queue/foo"+path_separator+"*\n" +
-      "id:1\n" +
-      "receipt:0\n"+
-      "\n")
+}
+class StompLevelDBParallelTest extends StompParallelTest with BrokerParallelTestExecution {
 
-    wait_for_receipt("0")
+  override def broker_config_uri: String = "xml:classpath:apollo-stomp-leveldb.xml"
 
-    def put(dest:String) = {
-      client.write(
-        "SEND\n" +
-        "destination:/queue/"+dest+"\n" +
-        "\n" +
-        "message:"+dest+"\n")
+  test("(APLO-198) Apollo sometimes does not send all the messages in a queue") {
+    skip_if_using_store
+    connect("1.1")
+    for( i <- 0 until 10000 ) {
+      async_send("/queue/BIGQUEUE", "message #"+i)
     }
+    sync_send("/queue/BIGQUEUE", "END")
+    client.close
 
-    def get(dest:String) = {
-      val frame = client.receive()
-      frame should startWith("MESSAGE\n")
-      frame should endWith("\n\nmessage:%s\n".format(dest))
+    var counter = 0
+    for( i <- 0 until 100 ) {
+      connect("1.1")
+      subscribe("1", "/queue/BIGQUEUE", "client", false, "", false)
+      for( j <- 0 until 100 ) {
+        assert_received("message #"+counter)(true)
+        counter+=1
+      }
+      client.write(
+        "DISCONNECT\n" +
+        "receipt:disco\n" +
+        "\n")
+      wait_for_receipt("disco", client, true)
+      client.close
+      within(2, SECONDS) {
+        val status = queue_status("BIGQUEUE")
+        status.consumers.size() should be(0)
+      }
     }
 
-    // We should not get this one..
-    put("bar"+path_separator+"a")
-
-    put("foo"+path_separator+"a")
-    get("foo"+path_separator+"a")
+    connect("1.1")
+    subscribe("1", "/queue/BIGQUEUE", "client")
+    assert_received("END")(true)
 
-    put("foo"+path_separator+"b")
-    get("foo"+path_separator+"b")
   }
-}
 
-class CustomStompWildcardTest extends StompWildcardTest {
-  override def broker_config_uri: String = "xml:classpath:apollo-stomp-custom-dest-delimiters.xml"
-  override def path_separator = "/"
-}
+  test("Multiple dsubs contain the same messages (Test case for APLO-210)") {
+    skip_if_using_store
 
-class StompExpirationTest extends StompTestSupport {
+    val sub_count = 3
+    val message_count = 1000
 
-  def path_separator = "."
+    // establish 3 durable subs..
+    connect("1.1")
+    for( sub <- 1 to sub_count ) {
+      subscribe(id="sub"+sub, dest="/topic/sometopic", persistent=true)
+    }
+    close()
 
-  test("Messages Expire") {
     connect("1.1")
 
-    def put(msg:String, ttl:Option[Long]=None) = {
-      val expires_header = ttl.map(t=> "expires:"+(System.currentTimeMillis()+t)+"\n").getOrElse("")
-      client.write(
-        "SEND\n" +
-        expires_header +
-        "destination:/queue/exp\n" +
-        "\n" +
-        "message:"+msg+"\n")
+    val filler = ":"+("x"*(1024*10))
+
+    // Now send a bunch of messages....
+    for( i <- 1 to message_count ) {
+      async_send(dest="/topic/sometopic", headers="persistent:true\n", body=i+filler)
     }
 
-    put("1")
-    put("2", Some(1000L))
-    put("3")
+    // Empty out the durable durable sub
+    for( sub <- 1 to sub_count ) {
+      subscribe(id="sub"+sub, dest="/topic/sometopic", persistent=true, sync=false)
+      for( i <- 1 to message_count ) {
+        assert_received(body=i+filler, sub="sub"+sub)
+      }
+    }
 
-    Thread.sleep(2000)
+  }
+
+  test("Can directly send an recieve from a durable sub") {
+    skip_if_using_store
+    connect("1.1")
 
+    // establish 2 durable subs..
     client.write(
       "SUBSCRIBE\n" +
-      "destination:/queue/exp\n" +
-      "id:1\n" +
-      "receipt:0\n"+
+      "destination:/topic/sometopic\n" +
+      "id:sub1\n" +
+      "persistent:true\n" +
+      "receipt:0\n" +
       "\n")
     wait_for_receipt("0")
 
+    client.write(
+      "SUBSCRIBE\n" +
+      "destination:/topic/sometopic\n" +
+      "id:sub2\n" +
+      "persistent:true\n" +
+      "receipt:0\n" +
+      "\n")
+    wait_for_receipt("0")
 
-    def get(dest:String) = {
-      val frame = client.receive()
-      frame should startWith("MESSAGE\n")
-      frame should endWith("\n\nmessage:%s\n".format(dest))
-    }
-
-    get("1")
-    get("3")
-  }
-}
+    client.close
+    connect("1.1")
 
-class StompTempDestinationTest extends StompTestSupport {
+    // Now send a bunch of messages....
+    // Send only to sub 1
+    client.write(
+      "SEND\n" +
+      "destination:/dsub/sub1\n" +
+      "\n" +
+      "sub1 msg\n")
 
-  def path_separator = "."
+    // Send to all subs
+    client.write(
+      "SEND\n" +
+      "destination:/topic/sometopic\n" +
+      "\n" +
+      "LAST\n")
 
-  test("Temp Queue Send Receive") {
-    connect("1.1")
 
-    def put(msg:String) = {
-      client.write(
-        "SEND\n" +
-        "destination:/temp-queue/test\n" +
-        "reply-to:/temp-queue/test\n" +
-        "receipt:0\n" +
-        "\n" +
-        "message:"+msg+"\n")
-      wait_for_receipt("0")
+    // Now try to get all the previously sent messages.
+    def get(expected:String) = {
+      val frame = client.receive()
+      frame should startWith("MESSAGE\n")
+      frame should endWith("\n\n"+expected)
     }
 
-    put("1")
-
+    // Empty out the first durable sub
     client.write(
       "SUBSCRIBE\n" +
-      "destination:/temp-queue/test\n" +
+      "destination:/dsub/sub1\n" +
       "id:1\n" +
       "\n")
 
-    def get(dest:String) = {
-      val frame = client.receive()
-      frame should startWith("MESSAGE\n")
-      frame should endWith("\n\nmessage:%s\n".format(dest))
+    get("sub1 msg\n")
+    get("LAST\n")
 
-      // extract headers as a map of values.
-      Map((frame.split("\n").reverse.flatMap { line =>
-        if( line.contains(":") ) {
-          val parts = line.split(":", 2)
-          Some((parts(0), parts(1)))
-        } else {
-          None
-        }
-      }):_*)
-    }
+    // Empty out the 2nd durable sub
+    client.write(
+      "SUBSCRIBE\n" +
+      "destination:/dsub/sub2\n" +
+      "id:2\n" +
+      "\n")
 
-    // The destination and reply-to headers should get updated with actual
-    // Queue names
-    val message = get("1")
-    val actual_temp_dest_name = message.get("destination").get
-    actual_temp_dest_name should startWith("/queue/temp.default.")
-    message.get("reply-to") should be === ( message.get("destination") )
+    get("LAST\n")
+  }
+  test("You can connect and then unsubscribe from existing durable sub (APLO-157)") {
+    skip_if_using_store
+    connect("1.1")
+    subscribe("APLO-157", "/topic/APLO-157", "auto", true)
+    client.close()
 
-    // Different connection should be able to send a message to the temp destination..
-    var other = new StompClient
-    connect("1.1", other)
-    other.write(
-      "SEND\n" +
-      "destination:"+actual_temp_dest_name+"\n" +
-      "receipt:0\n" +
-      "\n")
-    wait_for_receipt("0", other)
+    // Make sure the durable sub exists.
+    connect("1.1")
+    sync_send("/topic/APLO-157", "1")
+    subscribe("APLO-157", "/topic/APLO-157", "client", true)
+    assert_received("1")
+    client.close()
 
-    // First client chould get the message.
-    var frame = client.receive()
-    frame should startWith("MESSAGE\n")
+    // Delete the durable sub..
+    connect("1.1")
+    unsubscribe("APLO-157", "persistent:true\n")
+    client.close()
 
-    // But not consume from it.
-    other.write(
+    // Make sure the durable sub does not exists.
+    connect("1.1")
+    subscribe("APLO-157", "/topic/APLO-157", "client", true)
+    async_send("/topic/APLO-157", "2")
+    assert_received("2")
+    unsubscribe("APLO-157", "persistent:true\n")
+
+  }
+
+  test("Can create dsubs with dots in them") {
+    connect("1.1")
+
+    client.write(
       "SUBSCRIBE\n" +
-      "destination:"+actual_temp_dest_name+"\n" +
-      "id:1\n" +
+      "destination:/topic/sometopic\n" +
+      "id:sub.1\n" +
+      "persistent:true\n" +
       "receipt:0\n" +
       "\n")
-    frame = other.receive()
-    frame should startWith("ERROR\n")
-    frame should include regex("""message:Not authorized to receive from the temporary destination""")
-    other.close()
+    wait_for_receipt("0")
 
-    // Check that temp queue is deleted once the client disconnects
-    put("2")
-    expect(true)(queue_exists(actual_temp_dest_name.stripPrefix("/queue/")))
-    client.close();
+    client.write(
+      "SEND\n" +
+      "destination:/dsub/sub.1\n" +
+      "receipt:0\n" +
+      "\n" +
+      "content\n")
+    wait_for_receipt("0")
 
-    within(10, SECONDS) {
-      expect(false)(queue_exists(actual_temp_dest_name.stripPrefix("/queue/")))
-    }
   }
 
-  test("Temp Topic Send Receive") {
+  test("Duplicate SUBSCRIBE updates durable subscription bindings") {
+    skip_if_using_store
     connect("1.1")
 
     client.write(
       "SUBSCRIBE\n" +
-      "destination:/temp-topic/test\n" +
-      "id:1\n" +
+      "destination:/topic/a\n" +
+      "id:sub1\n" +
+      "persistent:true\n" +
+      "receipt:0\n" +
       "\n")
+    wait_for_receipt("0")
 
-    def get(dest:String) = {
+    def get(expected:String) = {
       val frame = client.receive()
       frame should startWith("MESSAGE\n")
-      frame should endWith("\n\nmessage:%s\n".format(dest))
-
-      // extract headers as a map of values.
-      Map((frame.split("\n").reverse.flatMap { line =>
-        if( line.contains(":") ) {
-          val parts = line.split(":", 2)
-          Some((parts(0), parts(1)))
-        } else {
-          None
-        }
-      }):_*)
-    }
-
-    def put(msg:String) = {
-      client.write(
-        "SEND\n" +
-        "destination:/temp-topic/test\n" +
-        "reply-to:/temp-topic/test\n" +
-        "receipt:0\n" +
-        "\n" +
-        "message:"+msg+"\n")
-      wait_for_receipt("0", client)
+      frame should endWith("\n\n"+expected)
     }
-    put("1")
-
-    // The destination and reply-to headers should get updated with actual
-    // Queue names
-    val message = get("1")
-    val actual_temp_dest_name = message.get("destination").get
-    actual_temp_dest_name should startWith("/topic/temp.default.")
-    message.get("reply-to") should be === ( message.get("destination") )
 
-    // Different connection should be able to send a message to the temp destination..
-    var other = new StompClient
-    connect("1.1", other)
-    other.write(
+    // Validate that the durable sub is bound to /topic/a
+    client.write(
       "SEND\n" +
-      "destination:"+actual_temp_dest_name+"\n" +
+      "destination:/topic/a\n" +
+      "\n" +
+      "1\n")
+    get("1\n")
+
+    client.write(
+      "UNSUBSCRIBE\n" +
+      "id:sub1\n" +
       "receipt:0\n" +
       "\n")
-    wait_for_receipt("0", other)
-
-    // First client chould get the message.
-    var frame = client.receive()
-    frame should startWith("MESSAGE\n")
+    wait_for_receipt("0")
 
-    // But not consume from it.
-    other.write(
+    // Switch the durable sub to /topic/b
+    client.write(
       "SUBSCRIBE\n" +
-      "destination:"+actual_temp_dest_name+"\n" +
-      "id:1\n" +
+      "destination:/topic/b\n" +
+      "id:sub1\n" +
+      "persistent:true\n" +
       "receipt:0\n" +
       "\n")
-    frame = other.receive()
-    frame should startWith("ERROR\n")
-    frame should include regex("""message:Not authorized to receive from the temporary destination""")
-    other.close()
-
-    // Check that temp queue is deleted once the client disconnects
-    put("2")
-    expect(true)(topic_exists(actual_temp_dest_name.stripPrefix("/topic/")))
-    client.close();
+    wait_for_receipt("0")
 
-    within(10, SECONDS) {
-      expect(false)(topic_exists(actual_temp_dest_name.stripPrefix("/topic/")))
+    // all these should get dropped
+    for ( i <- 1 to 500 ) {
+      client.write(
+        "SEND\n" +
+        "destination:/topic/a\n" +
+        "\n" +
+        "DROPPED\n")
     }
 
+    // Not this one.. it's on the updated topic
+    client.write(
+      "SEND\n" +
+      "destination:/topic/b\n" +
+      "\n" +
+      "2\n")
+    get("2\n")
 
   }
 
-
-  test("Odd reply-to headers do not cause errors") {
+  test("Direct send to a non-existant a durable sub fails") {
     connect("1.1")
 
     client.write(
       "SEND\n" +
-      "destination:/queue/oddrepyto\n" +
-      "reply-to:sms:8139993334444\n" +
+      "destination:/dsub/doesnotexist\n" +
       "receipt:0\n" +
-      "\n")
-    wait_for_receipt("0")
-
-    client.write(
-      "SUBSCRIBE\n" +
-      "destination:/queue/oddrepyto\n" +
-      "id:1\n" +
-      "\n")
+      "\n" +
+      "content\n")
 
     val frame = client.receive()
-    frame should startWith("MESSAGE\n")
-    frame should include("reply-to:sms:8139993334444\n")
+    frame should startWith("ERROR\n")
+    frame should include("message:The destination does not exist")
   }
-}
-
-class StompUdpInteropTest extends StompTestSupport {
 
-  test("UDP to STOMP interop") {
-    
+  test("Direct subscribe to a non-existant a durable sub fails") {
     connect("1.1")
-    subscribe("0", "/topic/udp")
-
-    val udp_port:Int = connector_port("udp").get
-    val channel = DatagramChannel.open();
-
-    val target = new InetSocketAddress("127.0.0.1", udp_port)
-    channel.send(new AsciiBuffer("Hello").toByteBuffer, target)
-
-    assert_received("Hello")
-  }
-}
 
-class StompNackTest extends StompTestSupport {
-
-  test("NACKing moves messages to DLQ (non-persistent)") {
-    connect("1.1")
-    sync_send("/queue/nacker.a", "this msg is not persistent")
+    client.write(
+      "SUBSCRIBE\n" +
+      "destination:/dsub/doesnotexist\n" +
+      "id:1\n" +
+      "receipt:0\n" +
+      "\n")
 
-    subscribe("0", "/queue/nacker.a", "client", false, "", false)
-    subscribe("dlq", "/queue/dlq.nacker.a", "auto", false, "", false)
-    var ack = assert_received("this msg is not persistent", "0")
-    ack(false)
-    ack = assert_received("this msg is not persistent", "0")
-    ack(false)
+    val frame = client.receive()
+    frame should startWith("ERROR\n")
+    frame should include("message:Durable subscription does not exist")
 
-    // It should be sent to the DLQ after the 2nd nak
-    assert_received("this msg is not persistent", "dlq")
   }
 
-  test("NACKing moves messages to DLQ (persistent)") {
-    connect("1.1")
-    sync_send("/queue/nacker.b", "this msg is persistent", "persistent:true\n")
-
-    subscribe("0", "/queue/nacker.b", "client", false, "", false)
-    subscribe("dlq", "/queue/dlq.nacker.b", "auto", false, "", false)
-    var ack = assert_received("this msg is persistent", "0")
-    ack(false)
-    ack = assert_received("this msg is persistent", "0")
-    ack(false)
-
-    // It should be sent to the DLQ after the 2nd nak
-    assert_received("this msg is persistent", "dlq")
-  }
 }
-
-class StompNackTestOnLevelDBTest extends StompNackTest {
-  override def broker_config_uri: String = "xml:classpath:apollo-stomp-leveldb.xml"
-
-  test("NACKing without DLQ consumer (persistent)"){
-    connect("1.1")
-    sync_send("/queue/nacker.b", "this msg is persistent", "persistent:true\n")
-
-    subscribe("0", "/queue/nacker.b", "client", false, "", false)
-
-    var ack = assert_received("this msg is persistent", "0")
-    ack(false)
-    ack = assert_received("this msg is persistent", "0")
-    ack(false)
-    Thread.sleep(1000)
-  }
+class StompBDBParallelTest extends StompLevelDBParallelTest {
+  override def broker_config_uri: String = "xml:classpath:apollo-stomp-bdb.xml"
 }
 
-class StompDropPolicyTest extends StompTestSupport {
+class StompDropPolicyTest extends StompTestSupport with BrokerParallelTestExecution {
 
   override def broker_config_uri: String = "xml:classpath:apollo-stomp-leveldb.xml"
 
@@ -2599,7 +2525,10 @@ class StompDropPolicyTest extends StompT
       sync_send("/queue/drop.head.persistent", "%0100d".format(i))
     }
     subscribe("0", "/queue/drop.head.persistent")
-    for(i <- 446 until 1000) {
+
+    val initial = client.receive().split("\n").last.toInt
+    initial should be > ( 100 )
+    for(i <- (initial+1) until 1000) {
       assert_received("%0100d".format(i))
     }
   }
@@ -2611,7 +2540,9 @@ class StompDropPolicyTest extends StompT
       sync_send("/queue/drop.head.non", "%0100d".format(i))
     }
     subscribe("0", "/queue/drop.head.non")
-    for(i <- 427 until 1000) {
+    val initial = client.receive().split("\n").last.toInt
+    initial should be > ( 100 )
+    for(i <- (initial+1) until 1000) {
       assert_received("%0100d".format(i))
     }
   }
@@ -2654,4 +2585,49 @@ class StompDropPolicyTest extends StompT
     async_send("/queue/drop.tail.non", "end")
     assert_received("end")
   }
-}
\ No newline at end of file
+}
+
+class StompWildcardParallelTest extends StompTestSupport with BrokerParallelTestExecution {
+
+  def path_separator = "."
+
+  test("Wildcard subscription") {
+    connect("1.1")
+
+    client.write(
+      "SUBSCRIBE\n" +
+      "destination:/queue/foo"+path_separator+"*\n" +
+      "id:1\n" +
+      "receipt:0\n"+
+      "\n")
+
+    wait_for_receipt("0")
+
+    def put(dest:String) = {
+      client.write(
+        "SEND\n" +
+        "destination:/queue/"+dest+"\n" +
+        "\n" +
+        "message:"+dest+"\n")
+    }
+
+    def get(dest:String) = {
+      val frame = client.receive()
+      frame should startWith("MESSAGE\n")
+      frame should endWith("\n\nmessage:%s\n".format(dest))
+    }
+
+    // We should not get this one..
+    put("bar"+path_separator+"a")
+
+    put("foo"+path_separator+"a")
+    get("foo"+path_separator+"a")
+
+    put("foo"+path_separator+"b")
+    get("foo"+path_separator+"b")
+  }
+}
+class StompWildcardCustomParallelTest extends StompWildcardParallelTest {
+  override def broker_config_uri: String = "xml:classpath:apollo-stomp-custom-dest-delimiters.xml"
+  override def path_separator = "/"
+}

Modified: activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util.scala?rev=1366431&r1=1366430&r2=1366431&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-util/src/main/scala/org/apache/activemq/apollo/util.scala Fri Jul 27 16:04:01 2012
@@ -28,6 +28,22 @@ package object util {
 
   type FutureResult[T] = Future[Result[T, Throwable]]
 
+  object FutureResult {
+
+    implicit def wrap_future_result[T](value:T):FutureResult[T] = {
+      val rc = FutureResult[T]()
+      rc.apply(Success(value))
+      rc
+    }
+
+    implicit def unwrap_future_result[T](value:FutureResult[T]):T = {
+      value.await() match {
+        case Success(value) => value
+        case Failure(value) => throw value
+      }
+    }
+  }
+
   def FutureResult[T]() = Future[Result[T, Throwable]]()
 
   def FutureResult[T](value:Result[T, Throwable]) = {
@@ -58,19 +74,6 @@ package object util {
     }
   }
 
-  implicit def wrap_future_result[T](value:T):FutureResult[T] = {
-    val rc = FutureResult[T]()
-    rc.apply(Success(value))
-    rc
-  }
-
-  implicit def unwrap_future_result[T](value:FutureResult[T]):T = {
-    value.await() match {
-      case Success(value) => value
-      case Failure(value) => throw value
-    }
-  }
-
   def sync_cb[T](func: (T=>Unit)=>Unit ) = {
     var rc:Option[T] = null
     val cd = new CountDownLatch(1)

Modified: activemq/activemq-apollo/trunk/apollo-util/src/test/scala/org/apache/activemq/apollo/util/FunSuiteSupport.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-util/src/test/scala/org/apache/activemq/apollo/util/FunSuiteSupport.scala?rev=1366431&r1=1366430&r2=1366431&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-util/src/test/scala/org/apache/activemq/apollo/util/FunSuiteSupport.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-util/src/test/scala/org/apache/activemq/apollo/util/FunSuiteSupport.scala Fri Jul 27 16:04:01 2012
@@ -17,20 +17,28 @@
 
 package org.apache.activemq.apollo.util
 
-import _root_.org.junit.runner.RunWith
-import org.scalatest.junit.JUnitRunner
+import org.junit.runner.RunWith
 import java.io.File
 import java.lang.String
 import collection.immutable.Map
 import org.scalatest._
 import java.util.concurrent.TimeUnit
 import FileSupport._
+import scala.Some
+import org.apache.activemq.apollo.util.FunSuiteSupport._
+import java.util.concurrent.locks.{ReentrantReadWriteLock, Lock, ReadWriteLock}
+
+object FunSuiteSupport {
+  class SkipTestException extends RuntimeException
+  val parallel_test_class_lock = new ReentrantReadWriteLock()
+}
 
 /**
  * @version $Revision : 1.1 $
  */
-@RunWith(classOf[JUnitRunner])
-abstract class FunSuiteSupport extends FunSuite with Logging with BeforeAndAfterAll {
+@RunWith(classOf[org.scalatest.junit.ParallelJUnitRunner])
+abstract class FunSuiteSupport extends FunSuite with Logging with ParallelBeforeAndAfterAll {
+
   protected var _basedir = try {
     var file = new File(getClass.getProtectionDomain.getCodeSource.getLocation.getFile)
     file = (file / ".." / "..").getCanonicalFile
@@ -44,6 +52,18 @@ abstract class FunSuiteSupport extends F
       "."
   }
 
+  def skip(check:Boolean=true):Unit = if(check) throw new SkipTestException()
+
+  override protected def test(testName: String, testTags: Tag*)(testFun: => Unit) {
+    super.test(testName, testTags:_*) {
+      try {
+        testFun
+      } catch {
+        case e:SkipTestException =>
+      }
+    }
+  }
+
   /**
    * Returns the base directory of the current project
    */
@@ -52,9 +72,22 @@ abstract class FunSuiteSupport extends F
   /**
    * Returns ${basedir}/target/test-data
    */
-  def test_data_dir = basedir / "target"/ "test-data"
+  def test_data_dir = basedir / "target"/ "test-data" / (getClass.getName)
+
+  /**
+   * Can this test class run in parallel with other
+   * test classes.
+   * @return
+   */
+  def is_parallel_test_class = true
 
   override protected def beforeAll(map: Map[String, Any]): Unit = {
+    if ( is_parallel_test_class ) {
+      parallel_test_class_lock.readLock().lock()
+    } else {
+      parallel_test_class_lock.writeLock().lock()
+    }
+
     _basedir = map.get("basedir") match {
       case Some(basedir) =>
         basedir.toString
@@ -67,10 +100,37 @@ abstract class FunSuiteSupport extends F
     super.beforeAll(map)
   }
 
+  override protected def afterAll(configMap: Map[String, Any]) {
+    try {
+      super.afterAll(configMap)
+    } finally {
+      if ( is_parallel_test_class ) {
+        parallel_test_class_lock.readLock().unlock()
+      } else {
+        parallel_test_class_lock.writeLock().unlock()
+      }
+    }
+  }
+
+
   //
   // Allows us to get the current test name.
   //
 
+  /**
+   * Defines a method (that takes a <code>configMap</code>) to be run after
+   * all of this suite's tests and nested suites have been run.
+   *
+   * <p>
+   * This trait's implementation
+   * of <code>run</code> invokes this method after executing all tests
+   * and nested suites (passing in the <code>configMap</code> passed to it), thus this
+   * method can be used to tear down a test fixture
+   * needed by the entire suite. This trait's implementation of this method invokes the
+   * overloaded form of <code>afterAll</code> that takes no <code>configMap</code>.
+   * </p>
+   */
+
   val _testName = new ThreadLocal[String]();
 
   def testName = _testName.get
@@ -81,6 +141,7 @@ abstract class FunSuiteSupport extends F
       super.runTest(testName, reporter, stopper, configMap, tracker)
     } finally {
       _testName.remove
+
     }
   }