You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2012/07/28 02:25:44 UTC

svn commit: r1366571 [1/2] - in /activemq/activemq-apollo/trunk: apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/ apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/web/ apollo-stomp/src/test/scala/org/apache/a...

Author: chirino
Date: Sat Jul 28 00:25:43 2012
New Revision: 1366571

URL: http://svn.apache.org/viewvc?rev=1366571&view=rev
Log:
Refactor stomp tests into their own source files.

Added:
    activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/
    activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompBDBParallelTest.scala
    activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompClient.scala
      - copied, changed from r1366566, activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompClient.scala
    activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompDropPolicyTest.scala
    activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompLevelDBParallelTest.scala
    activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompMetricsTest.scala
    activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompParallelTest.scala
    activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompSecurityTest.scala
    activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompSerialTest.scala
    activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompSslSecurityTest.scala
    activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompSslTest.scala
    activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompTestSupport.scala
    activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompWildcardCustomParallelTest.scala
    activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompWildcardParallelTest.scala
Removed:
    activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompClient.scala
    activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala
Modified:
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/DelayingStoreSupport.scala
    activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/web/NetworkWebModule.scala
    activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompWebTest.scala

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/DelayingStoreSupport.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/DelayingStoreSupport.scala?rev=1366571&r1=1366570&r2=1366571&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/DelayingStoreSupport.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/store/DelayingStoreSupport.scala Sat Jul 28 00:25:43 2012
@@ -253,8 +253,7 @@ trait DelayingStoreSupport extends Store
       actions.values.foreach{ a =>
         // There must either be a dequeue or a message record for a enqueue request.
         // if not, then there should be a message locator
-
-        if( locator_based && a.message_record==null) {
+        if( a.message_record==null ) {
           if(!a.dequeues.isEmpty ){
             a.dequeues.foreach { d =>
               if ( d.message_locator.get() == null ) {
@@ -270,12 +269,8 @@ trait DelayingStoreSupport extends Store
             }
           }
         }
-        else if( !a.enqueues.isEmpty && ( a.message_record==null && a.dequeues.isEmpty ) ) {
-          return false
-        }
-
       }
-      true  
+      true
     }
 
     override def dispose = this.synchronized {
@@ -501,7 +496,7 @@ trait DelayingStoreSupport extends Store
         None
       } else {
         uow.state = UowFlushing
-        assert( uow.have_locators )
+        assert( !locator_based || uow.have_locators )
         // It will not be possible to cancel the UOW anymore..
         uow.actions.foreach { case (_, action) =>
           action.enqueues.foreach { queue_entry=>

Modified: activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/web/NetworkWebModule.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/web/NetworkWebModule.scala?rev=1366571&r1=1366570&r2=1366571&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/web/NetworkWebModule.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-network/src/main/scala/org/apache/activemq/apollo/broker/network/web/NetworkWebModule.scala Sat Jul 28 00:25:43 2012
@@ -1,3 +1,20 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package org.apache.activemq.apollo.broker.network.web
 
 import org.apache.activemq.apollo.web.resources.{HelpResourceJSON, Resource}

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompWebTest.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompWebTest.scala?rev=1366571&r1=1366570&r2=1366571&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompWebTest.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompWebTest.scala Sat Jul 28 00:25:43 2012
@@ -25,6 +25,7 @@ import java.util.Arrays
 import org.apache.activemq.apollo.util.FileSupport._
 import org.openqa.selenium.firefox.{FirefoxDriver, FirefoxProfile}
 import java.util.concurrent.TimeUnit._
+import org.apache.activemq.apollo.stomp.test._
 
 /**
  * <p>

Added: activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompBDBParallelTest.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompBDBParallelTest.scala?rev=1366571&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompBDBParallelTest.scala (added)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompBDBParallelTest.scala Sat Jul 28 00:25:43 2012
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.stomp.test
+
+import java.lang.String
+
+class StompBDBParallelTest extends StompLevelDBParallelTest {
+  override def broker_config_uri: String = "xml:classpath:apollo-stomp-bdb.xml"
+}

Copied: activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompClient.scala (from r1366566, activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompClient.scala)
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompClient.scala?p2=activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompClient.scala&p1=activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompClient.scala&r1=1366566&r2=1366571&rev=1366571&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompClient.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompClient.scala Sat Jul 28 00:25:43 2012
@@ -6,7 +6,7 @@
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
  *
- *      http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.activemq.apollo.stomp
+package org.apache.activemq.apollo.stomp.test
 
 import java.net.{Socket, InetSocketAddress}
 import org.fusesource.hawtbuf.AsciiBuffer
@@ -24,21 +24,22 @@ import java.io._
 import org.apache.activemq.apollo.broker.{KeyStorage, ProtocolException}
 import javax.net.ssl.{SSLSocket, SSLContext}
 import org.scalatest.matchers.ShouldMatchers
+import org.apache.activemq.apollo.stomp.Stomp
 
 /**
  * A simple Stomp client used for testing purposes
  */
 class StompClient extends ShouldMatchers {
 
-  var socket:Socket = new Socket
-  var out:OutputStream = null
-  var in:InputStream = null
-  val bufferSize = 64*1204
-  var key_storeage:KeyStorage=null
+  var socket: Socket = new Socket
+  var out: OutputStream = null
+  var in: InputStream = null
+  val bufferSize = 64 * 1204
+  var key_storeage: KeyStorage = null
 
   def open(host: String, port: Int) = {
 
-    socket = if( key_storeage!=null ) {
+    socket = if (key_storeage != null) {
       val context = SSLContext.getInstance("TLS")
       context.init(key_storeage.create_key_managers, key_storeage.create_trust_managers, null)
       context.getSocketFactory().createSocket()
@@ -49,7 +50,7 @@ class StompClient extends ShouldMatchers
     }
     socket.connect(new InetSocketAddress(host, port))
     socket.setSoLinger(true, 1)
-    socket.setSoTimeout(30*1000)
+    socket.setSoTimeout(30 * 1000)
     out = new BufferedOutputStream(socket.getOutputStream, bufferSize)
     in = new BufferedInputStream(socket.getInputStream, bufferSize)
   }
@@ -58,24 +59,24 @@ class StompClient extends ShouldMatchers
     socket.close
   }
 
-  def write(frame:String) = {
+  def write(frame: String) = {
     out.write(frame.getBytes("UTF-8"))
     out.write(0)
     out.write('\n')
     out.flush
   }
 
-  def write(frame:Array[Byte]) = {
+  def write(frame: Array[Byte]) = {
     out.write(frame)
     out.write(0)
     out.write('\n')
     out.flush
   }
 
-  def skip():Unit = {
+  def skip(): Unit = {
     var c = in.read
-    while( c >= 0 ) {
-      if( c==0 ) {
+    while (c >= 0) {
+      if (c == 0) {
         return
       }
       c = in.read()
@@ -83,15 +84,15 @@ class StompClient extends ShouldMatchers
     throw new EOFException()
   }
 
-  def receive():String = {
+  def receive(): String = {
     var start = true;
     val buffer = new BAOS()
     var c = in.read
-    while( c >= 0 ) {
-      if( c==0 ) {
+    while (c >= 0) {
+      if (c == 0) {
         return new String(buffer.toByteArray, "UTF-8")
       }
-      if( !start || c!= Stomp.NEWLINE) {
+      if (!start || c != Stomp.NEWLINE) {
         start = false
         buffer.write(c)
       }
@@ -100,18 +101,18 @@ class StompClient extends ShouldMatchers
     throw new EOFException()
   }
 
-  def wait_for_receipt(id:String): Unit = {
+  def wait_for_receipt(id: String): Unit = {
     val frame = receive()
     frame should startWith("RECEIPT\n")
-    frame should include("receipt-id:"+id+"\n")
+    frame should include("receipt-id:" + id + "\n")
   }
 
 
-  def receiveAscii():AsciiBuffer = {
+  def receiveAscii(): AsciiBuffer = {
     val buffer = new BAOS()
     var c = in.read
-    while( c >= 0 ) {
-      if( c==0 ) {
+    while (c >= 0) {
+      if (c == 0) {
         return buffer.toBuffer.ascii
       }
       buffer.write(c)
@@ -120,10 +121,10 @@ class StompClient extends ShouldMatchers
     throw new EOFException()
   }
 
-  def receive(expect:String):String = {
+  def receive(expect: String): String = {
     val rc = receive()
-    if( !rc.startsWith(expect) ) {
-      throw new ProtocolException("Expected "+expect)
+    if (!rc.startsWith(expect)) {
+      throw new ProtocolException("Expected " + expect)
     }
     rc
   }

Added: activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompDropPolicyTest.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompDropPolicyTest.scala?rev=1366571&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompDropPolicyTest.scala (added)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompDropPolicyTest.scala Sat Jul 28 00:25:43 2012
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.stomp.test
+
+import java.lang.String
+import org.apache.activemq.apollo.broker._
+
+class StompDropPolicyTest extends StompTestSupport with BrokerParallelTestExecution {
+
+  override def broker_config_uri: String = "xml:classpath:apollo-stomp-leveldb.xml"
+
+  test("Head Drop Policy: Persistent") {
+    connect("1.1")
+    // Some of these messages should get dropped.
+    for (i <- 0 until 1000) {
+      sync_send("/queue/drop.head.persistent", "%0100d".format(i))
+    }
+    subscribe("0", "/queue/drop.head.persistent")
+
+    val initial = client.receive().split("\n").last.toInt
+    initial should be > (100)
+    for (i <- (initial + 1) until 1000) {
+      assert_received("%0100d".format(i))
+    }
+  }
+
+  test("Head Drop Policy: Non Persistent") {
+    connect("1.1")
+    // Some of these messages should get dropped.
+    for (i <- 0 until 1000) {
+      sync_send("/queue/drop.head.non", "%0100d".format(i))
+    }
+    subscribe("0", "/queue/drop.head.non")
+    val initial = client.receive().split("\n").last.toInt
+    initial should be > (100)
+    for (i <- (initial + 1) until 1000) {
+      assert_received("%0100d".format(i))
+    }
+  }
+
+  test("Tail Drop Policy: Persistent") {
+    connect("1.1")
+    // Some of these messages should get dropped.
+    for (i <- 0 until 1000) {
+      sync_send("/queue/drop.tail.persistent", "%0100d".format(i))
+    }
+
+    val metrics = queue_status("drop.tail.persistent").metrics
+    metrics.queue_items should be < (1000L)
+
+    subscribe("0", "/queue/drop.tail.persistent")
+    for (i <- 0L until metrics.queue_items) {
+      assert_received("%0100d".format(i))
+    }
+
+    async_send("/queue/drop.tail.persistent", "end")
+    assert_received("end")
+
+  }
+
+  test("Tail Drop Policy: Non Persistent") {
+    connect("1.1")
+    // Some of these messages should get dropped.
+    for (i <- 0 until 1000) {
+      sync_send("/queue/drop.tail.non", "%0100d".format(i))
+    }
+
+    val metrics = queue_status("drop.tail.non").metrics
+    metrics.queue_items should be < (1000L)
+
+    subscribe("0", "/queue/drop.tail.non")
+    for (i <- 0L until metrics.queue_items) {
+      assert_received("%0100d".format(i))
+    }
+
+    async_send("/queue/drop.tail.non", "end")
+    assert_received("end")
+  }
+}

Added: activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompLevelDBParallelTest.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompLevelDBParallelTest.scala?rev=1366571&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompLevelDBParallelTest.scala (added)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompLevelDBParallelTest.scala Sat Jul 28 00:25:43 2012
@@ -0,0 +1,305 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.stomp.test
+
+import java.lang.String
+import java.util.concurrent.TimeUnit._
+import org.apache.activemq.apollo.broker._
+
+class StompLevelDBParallelTest extends StompParallelTest with BrokerParallelTestExecution {
+
+  override def broker_config_uri: String = "xml:classpath:apollo-stomp-leveldb.xml"
+
+  test("(APLO-198) Apollo sometimes does not send all the messages in a queue") {
+    skip_if_using_store
+    connect("1.1")
+    for (i <- 0 until 10000) {
+      async_send("/queue/BIGQUEUE", "message #" + i)
+    }
+    sync_send("/queue/BIGQUEUE", "END")
+    client.close
+
+    var counter = 0
+    for (i <- 0 until 100) {
+      connect("1.1")
+      subscribe("1", "/queue/BIGQUEUE", "client", false, "", false)
+      for (j <- 0 until 100) {
+        assert_received("message #" + counter)(true)
+        counter += 1
+      }
+      client.write(
+        "DISCONNECT\n" +
+                "receipt:disco\n" +
+                "\n")
+      wait_for_receipt("disco", client, true)
+      client.close
+      within(2, SECONDS) {
+        val status = queue_status("BIGQUEUE")
+        status.consumers.size() should be(0)
+      }
+    }
+
+    connect("1.1")
+    subscribe("1", "/queue/BIGQUEUE", "client")
+    assert_received("END")(true)
+
+  }
+
+  test("Multiple dsubs contain the same messages (Test case for APLO-210)") {
+    skip_if_using_store
+
+    val sub_count = 3
+    val message_count = 1000
+
+    // establish 3 durable subs..
+    connect("1.1")
+    for (sub <- 1 to sub_count) {
+      subscribe(id = "sub" + sub, dest = "/topic/sometopic", persistent = true)
+    }
+    close()
+
+    connect("1.1")
+
+    val filler = ":" + ("x" * (1024 * 10))
+
+    // Now send a bunch of messages....
+    for (i <- 1 to message_count) {
+      async_send(dest = "/topic/sometopic", headers = "persistent:true\n", body = i + filler)
+    }
+
+    // Empty out the durable durable sub
+    for (sub <- 1 to sub_count) {
+      subscribe(id = "sub" + sub, dest = "/topic/sometopic", persistent = true, sync = false)
+      for (i <- 1 to message_count) {
+        assert_received(body = i + filler, sub = "sub" + sub)
+      }
+    }
+
+  }
+
+  test("Can directly send an recieve from a durable sub") {
+    skip_if_using_store
+    connect("1.1")
+
+    // establish 2 durable subs..
+    client.write(
+      "SUBSCRIBE\n" +
+              "destination:/topic/sometopic\n" +
+              "id:sub1\n" +
+              "persistent:true\n" +
+              "receipt:0\n" +
+              "\n")
+    wait_for_receipt("0")
+
+    client.write(
+      "SUBSCRIBE\n" +
+              "destination:/topic/sometopic\n" +
+              "id:sub2\n" +
+              "persistent:true\n" +
+              "receipt:0\n" +
+              "\n")
+    wait_for_receipt("0")
+
+    client.close
+    connect("1.1")
+
+    // Now send a bunch of messages....
+    // Send only to sub 1
+    client.write(
+      "SEND\n" +
+              "destination:/dsub/sub1\n" +
+              "\n" +
+              "sub1 msg\n")
+
+    // Send to all subs
+    client.write(
+      "SEND\n" +
+              "destination:/topic/sometopic\n" +
+              "\n" +
+              "LAST\n")
+
+
+    // Now try to get all the previously sent messages.
+    def get(expected: String) = {
+      val frame = client.receive()
+      frame should startWith("MESSAGE\n")
+      frame should endWith("\n\n" + expected)
+    }
+
+    // Empty out the first durable sub
+    client.write(
+      "SUBSCRIBE\n" +
+              "destination:/dsub/sub1\n" +
+              "id:1\n" +
+              "\n")
+
+    get("sub1 msg\n")
+    get("LAST\n")
+
+    // Empty out the 2nd durable sub
+    client.write(
+      "SUBSCRIBE\n" +
+              "destination:/dsub/sub2\n" +
+              "id:2\n" +
+              "\n")
+
+    get("LAST\n")
+  }
+  test("You can connect and then unsubscribe from existing durable sub (APLO-157)") {
+    skip_if_using_store
+    connect("1.1")
+    subscribe("APLO-157", "/topic/APLO-157", "auto", true)
+    client.close()
+
+    // Make sure the durable sub exists.
+    connect("1.1")
+    sync_send("/topic/APLO-157", "1")
+    subscribe("APLO-157", "/topic/APLO-157", "client", true)
+    assert_received("1")
+    client.close()
+
+    // Delete the durable sub..
+    connect("1.1")
+    unsubscribe("APLO-157", "persistent:true\n")
+    client.close()
+
+    // Make sure the durable sub does not exists.
+    connect("1.1")
+    subscribe("APLO-157", "/topic/APLO-157", "client", true)
+    async_send("/topic/APLO-157", "2")
+    assert_received("2")
+    unsubscribe("APLO-157", "persistent:true\n")
+
+  }
+
+  test("Can create dsubs with dots in them") {
+    connect("1.1")
+
+    client.write(
+      "SUBSCRIBE\n" +
+              "destination:/topic/sometopic\n" +
+              "id:sub.1\n" +
+              "persistent:true\n" +
+              "receipt:0\n" +
+              "\n")
+    wait_for_receipt("0")
+
+    client.write(
+      "SEND\n" +
+              "destination:/dsub/sub.1\n" +
+              "receipt:0\n" +
+              "\n" +
+              "content\n")
+    wait_for_receipt("0")
+
+  }
+
+  test("Duplicate SUBSCRIBE updates durable subscription bindings") {
+    skip_if_using_store
+    connect("1.1")
+
+    client.write(
+      "SUBSCRIBE\n" +
+              "destination:/topic/a\n" +
+              "id:sub1\n" +
+              "persistent:true\n" +
+              "receipt:0\n" +
+              "\n")
+    wait_for_receipt("0")
+
+    def get(expected: String) = {
+      val frame = client.receive()
+      frame should startWith("MESSAGE\n")
+      frame should endWith("\n\n" + expected)
+    }
+
+    // Validate that the durable sub is bound to /topic/a
+    client.write(
+      "SEND\n" +
+              "destination:/topic/a\n" +
+              "\n" +
+              "1\n")
+    get("1\n")
+
+    client.write(
+      "UNSUBSCRIBE\n" +
+              "id:sub1\n" +
+              "receipt:0\n" +
+              "\n")
+    wait_for_receipt("0")
+
+    // Switch the durable sub to /topic/b
+    client.write(
+      "SUBSCRIBE\n" +
+              "destination:/topic/b\n" +
+              "id:sub1\n" +
+              "persistent:true\n" +
+              "receipt:0\n" +
+              "\n")
+    wait_for_receipt("0")
+
+    // all these should get dropped
+    for (i <- 1 to 500) {
+      client.write(
+        "SEND\n" +
+                "destination:/topic/a\n" +
+                "\n" +
+                "DROPPED\n")
+    }
+
+    // Not this one.. it's on the updated topic
+    client.write(
+      "SEND\n" +
+              "destination:/topic/b\n" +
+              "\n" +
+              "2\n")
+    get("2\n")
+
+  }
+
+  test("Direct send to a non-existant a durable sub fails") {
+    connect("1.1")
+
+    client.write(
+      "SEND\n" +
+              "destination:/dsub/doesnotexist\n" +
+              "receipt:0\n" +
+              "\n" +
+              "content\n")
+
+    val frame = client.receive()
+    frame should startWith("ERROR\n")
+    frame should include("message:The destination does not exist")
+  }
+
+  test("Direct subscribe to a non-existant a durable sub fails") {
+    connect("1.1")
+
+    client.write(
+      "SUBSCRIBE\n" +
+              "destination:/dsub/doesnotexist\n" +
+              "id:1\n" +
+              "receipt:0\n" +
+              "\n")
+
+    val frame = client.receive()
+    frame should startWith("ERROR\n")
+    frame should include("message:Durable subscription does not exist")
+
+  }
+
+}

Added: activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompMetricsTest.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompMetricsTest.scala?rev=1366571&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompMetricsTest.scala (added)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompMetricsTest.scala Sat Jul 28 00:25:43 2012
@@ -0,0 +1,267 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.stomp.test
+
+import java.util.concurrent.TimeUnit._
+
+/**
+ * These test cases check to make sure the broker stats are consistent with what
+ * would be expected.  These tests can't be run in parallel since they look at
+ * aggregate destination metrics.
+ */
+class StompMetricsTest extends StompTestSupport {
+
+  test("slow_consumer_policy='queue' metrics stay consistent on consumer close (APLO-211)") {
+    connect("1.1")
+
+    subscribe("0", "/topic/queued.APLO-211", "client");
+    async_send("/topic/queued.APLO-211", 1)
+    assert_received(1)(true)
+
+    val stat1 = topic_status("queued.APLO-211").metrics
+    disconnect()
+
+    within(3, SECONDS) {
+      val stat2 = topic_status("queued.APLO-211").metrics
+      stat2.producer_count should be(stat1.producer_count - 1)
+      stat2.consumer_count should be(stat1.consumer_count - 1)
+      stat2.enqueue_item_counter should be(stat1.enqueue_item_counter)
+      stat2.dequeue_item_counter should be(stat1.dequeue_item_counter)
+    }
+  }
+
+
+  test("Deleted qeueus are removed to aggregate queue-stats") {
+    connect("1.1")
+
+    val stat1 = get_queue_metrics
+
+    async_send("/queue/willdelete", 1)
+    async_send("/queue/willdelete", 2)
+    sync_send("/queue/willdelete", 3)
+
+    // not acked yet.
+    val stat2 = get_queue_metrics
+    stat2.producer_count should be(stat1.producer_count + 1)
+    stat2.consumer_count should be(stat1.consumer_count)
+    stat2.enqueue_item_counter should be(stat1.enqueue_item_counter + 3)
+    stat2.dequeue_item_counter should be(stat1.dequeue_item_counter + 0)
+    stat2.queue_items should be(stat1.queue_items + 3)
+
+    // Delete the queue
+    delete_queue("willdelete")
+
+    within(1, SECONDS) {
+      val stat3 = get_queue_metrics
+      stat3.producer_count should be(stat1.producer_count)
+      stat3.consumer_count should be(stat1.consumer_count)
+      stat3.enqueue_item_counter should be(stat1.enqueue_item_counter + 3)
+      stat3.dequeue_item_counter should be(stat1.dequeue_item_counter)
+      stat3.queue_items should be(stat1.queue_items)
+    }
+  }
+
+  test("Old consumers on topic slow_consumer_policy='queue' does not affect the agregate queue-metrics") {
+    connect("1.1")
+
+    subscribe("0", "/topic/queued.test1", "client");
+    sync_send("/topic/queued.test1", 1)
+
+    val stat1 = get_topic_metrics
+
+    async_send("/topic/queued.test1", 2)
+    async_send("/topic/queued.test1", 3)
+    val ack1 = assert_received(1)
+    val ack2 = assert_received(2)
+    val ack3 = assert_received(3)
+
+    // not acked yet.
+    val stat2 = get_topic_metrics
+    stat2.producer_count should be(stat1.producer_count)
+    stat2.consumer_count should be(stat1.consumer_count)
+    stat2.enqueue_item_counter should be(stat1.enqueue_item_counter + 2)
+    stat2.dequeue_item_counter should be(stat1.dequeue_item_counter + 0)
+    stat2.queue_items should be(stat1.queue_items + 2)
+
+    // Close the subscription.
+    unsubscribe("0")
+
+    within(1, SECONDS) {
+      val stat3 = get_topic_metrics
+      stat3.producer_count should be(stat1.producer_count)
+      stat3.consumer_count should be(stat1.consumer_count - 1)
+      stat3.enqueue_item_counter should be(stat1.enqueue_item_counter + 2)
+      stat3.dequeue_item_counter should be(stat1.dequeue_item_counter + 0)
+      stat3.queue_items should be(stat1.queue_items - 1)
+    }
+  }
+
+  test("New Topic Stats") {
+    connect("1.1")
+    subscribe("0", "/topic/newstats");
+    val stats = topic_status("newstats")
+    var now = System.currentTimeMillis()
+    (now - stats.metrics.enqueue_ts) should (be < 10 * 1000L)
+    (now - stats.metrics.dequeue_ts) should (be < 10 * 1000L)
+  }
+
+  test("Topic Stats") {
+    connect("1.1")
+
+    sync_send("/topic/stats", 1)
+    val stat1 = topic_status("stats")
+    stat1.producers.size() should be(1)
+    stat1.consumers.size() should be(0)
+    stat1.dsubs.size() should be(0)
+    stat1.metrics.enqueue_item_counter should be(1)
+    stat1.metrics.dequeue_item_counter should be(0)
+    stat1.metrics.queue_items should be(0)
+
+    subscribe("0", "/topic/stats");
+    async_send("/topic/stats", 2)
+    async_send("/topic/stats", 3)
+    assert_received(2)
+    assert_received(3)
+
+    val stat2 = topic_status("stats")
+    stat2.producers.size() should be(1)
+    stat2.consumers.size() should be(1)
+    stat2.dsubs.size() should be(0)
+    stat2.metrics.enqueue_item_counter should be(3)
+    stat2.metrics.dequeue_item_counter should be(2)
+    stat2.metrics.queue_items should be(0)
+    client.close()
+
+    within(1, SECONDS) {
+      val stat3 = topic_status("stats")
+      stat3.producers.size() should be(0)
+      stat3.consumers.size() should be(0)
+      stat3.dsubs.size() should be(0)
+      stat3.metrics.enqueue_item_counter should be(3)
+      stat3.metrics.dequeue_item_counter should be(2)
+      stat3.metrics.queue_items should be(0)
+    }
+  }
+
+  test("Topic slow_consumer_policy='queue' Stats") {
+    connect("1.1")
+
+    sync_send("/topic/queued.stats", 1)
+    val stat1 = topic_status("queued.stats")
+    stat1.producers.size() should be(1)
+    stat1.consumers.size() should be(0)
+    stat1.dsubs.size() should be(0)
+    stat1.metrics.enqueue_item_counter should be(1)
+    stat1.metrics.dequeue_item_counter should be(0)
+    stat1.metrics.queue_items should be(0)
+
+    subscribe("0", "/topic/queued.stats", "client");
+    async_send("/topic/queued.stats", 2)
+    async_send("/topic/queued.stats", 3)
+    val ack2 = assert_received(2)
+    val ack3 = assert_received(3)
+
+    // not acked yet.
+    val stat2 = topic_status("queued.stats")
+    stat2.producers.size() should be(1)
+    stat2.consumers.size() should be(1)
+    stat2.dsubs.size() should be(0)
+    stat2.metrics.enqueue_item_counter should be(3)
+    stat2.metrics.dequeue_item_counter should be(0)
+    stat2.metrics.queue_items should be(2)
+
+    // Ack now..
+    ack2(true);
+    ack3(true)
+
+    within(1, SECONDS) {
+      val stat3 = topic_status("queued.stats")
+      stat3.producers.size() should be(1)
+      stat3.consumers.size() should be(1)
+      stat3.dsubs.size() should be(0)
+      stat3.metrics.enqueue_item_counter should be(3)
+      stat3.metrics.dequeue_item_counter should be(2)
+      stat3.metrics.queue_items should be(0)
+    }
+
+    unsubscribe("0")
+    client.close()
+    within(1, SECONDS) {
+      val stat4 = topic_status("queued.stats")
+      stat4.producers.size() should be(0)
+      stat4.consumers.size() should be(0)
+      stat4.dsubs.size() should be(0)
+      stat4.metrics.enqueue_item_counter should be(3)
+      stat4.metrics.dequeue_item_counter should be(2)
+      stat4.metrics.queue_items should be(0)
+    }
+  }
+
+  test("Topic Durable Sub Stats.") {
+    connect("1.1")
+
+    sync_send("/topic/dsubed.stats", 1)
+    val stat1 = topic_status("dsubed.stats")
+    stat1.producers.size() should be(1)
+    stat1.consumers.size() should be(0)
+    stat1.dsubs.size() should be(0)
+    stat1.metrics.enqueue_item_counter should be(1)
+    stat1.metrics.dequeue_item_counter should be(0)
+    stat1.metrics.queue_items should be(0)
+
+    subscribe("dsub1", "/topic/dsubed.stats", "client", true);
+    async_send("/topic/dsubed.stats", 2)
+    async_send("/topic/dsubed.stats", 3)
+    val ack2 = assert_received(2)
+    val ack3 = assert_received(3)
+
+    // not acked yet.
+    val stat2 = topic_status("dsubed.stats")
+    stat2.producers.size() should be(1)
+    stat2.consumers.size() should be(1)
+    stat2.dsubs.size() should be(1)
+    stat2.metrics.enqueue_item_counter should be(3)
+    stat2.metrics.dequeue_item_counter should be(0)
+    stat2.metrics.queue_items should be(2)
+
+    // Ack SOME now..
+    ack2(true);
+
+    within(1, SECONDS) {
+      val stat3 = topic_status("dsubed.stats")
+      stat3.producers.size() should be(1)
+      stat3.consumers.size() should be(1)
+      stat3.dsubs.size() should be(1)
+      stat3.metrics.enqueue_item_counter should be(3)
+      stat3.metrics.dequeue_item_counter should be(1)
+      stat3.metrics.queue_items should be(1)
+    }
+
+    unsubscribe("dsub1")
+    client.close()
+    within(1, SECONDS) {
+      val stat4 = topic_status("dsubed.stats")
+      stat4.producers.size() should be(0)
+      stat4.consumers.size() should be(1)
+      stat4.dsubs.size() should be(1)
+      stat4.metrics.enqueue_item_counter should be(3)
+      stat4.metrics.dequeue_item_counter should be(1)
+      stat4.metrics.queue_items should be(1)
+    }
+  }
+
+}

Added: activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompParallelTest.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompParallelTest.scala?rev=1366571&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompParallelTest.scala (added)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompParallelTest.scala Sat Jul 28 00:25:43 2012
@@ -0,0 +1,1527 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.stomp.test
+
+import java.util.concurrent.TimeUnit._
+import java.nio.channels.DatagramChannel
+import org.fusesource.hawtbuf.AsciiBuffer
+import org.apache.activemq.apollo.broker._
+import java.net.{SocketTimeoutException, InetSocketAddress}
+import org.apache.activemq.apollo.stomp.{Stomp, StompProtocolHandler}
+
+/**
+ * These tests can be run in parallel against a single Apollo broker.
+ */
+class StompParallelTest extends StompTestSupport with BrokerParallelTestExecution {
+
+  def skip_if_using_store = skip(broker_config_uri.endsWith("-bdb.xml") || broker_config_uri.endsWith("-leveldb.xml"))
+
+  test("Stomp 1.0 CONNECT") {
+    connect("1.0")
+  }
+
+  test("Stomp 1.1 CONNECT") {
+    connect("1.1")
+  }
+
+  test("Stomp 1.1 CONNECT /w STOMP Action") {
+
+    client.open("localhost", port)
+
+    client.write(
+      "STOMP\n" +
+              "accept-version:1.0,1.1\n" +
+              "host:localhost\n" +
+              "\n")
+    val frame = client.receive()
+    frame should startWith("CONNECTED\n")
+    frame should include regex ("""session:.+?\n""")
+    frame should include("version:1.1\n")
+  }
+
+  test("Stomp 1.1 CONNECT /w valid version fallback") {
+
+    client.open("localhost", port)
+
+    client.write(
+      "CONNECT\n" +
+              "accept-version:1.0,10.0\n" +
+              "host:localhost\n" +
+              "\n")
+    val frame = client.receive()
+    frame should startWith("CONNECTED\n")
+    frame should include regex ("""session:.+?\n""")
+    frame should include("version:1.0\n")
+  }
+
+  test("Stomp 1.1 CONNECT /w invalid version fallback") {
+
+    client.open("localhost", port)
+
+    client.write(
+      "CONNECT\n" +
+              "accept-version:9.0,10.0\n" +
+              "host:localhost\n" +
+              "\n")
+    val frame = client.receive()
+    frame should startWith("ERROR\n")
+    frame should include regex ("""version:.+?\n""")
+    frame should include regex ("""message:.+?\n""")
+  }
+
+  test("Stomp CONNECT /w invalid virtual host") {
+
+    client.open("localhost", port)
+
+    client.write(
+      "CONNECT\n" +
+              "accept-version:1.0,1.1\n" +
+              "host:invalid\n" +
+              "\n")
+    val frame = client.receive()
+    frame should startWith("ERROR\n")
+    frame should include regex ("""message:.+?\n""")
+  }
+
+  test("Stomp 1.1 Broker sends heart-beat") {
+
+    client.open("localhost", port)
+
+    client.write(
+      "CONNECT\n" +
+              "accept-version:1.1\n" +
+              "host:localhost\n" +
+              "heart-beat:0,1000\n" +
+              "\n")
+    val frame = client.receive()
+    frame should startWith("CONNECTED\n")
+    frame should include regex ("""heart-beat:.+?\n""")
+
+    def heart_beat_after(time: Long) {
+      var start = System.currentTimeMillis
+      val c = client.in.read()
+      c should be === (Stomp.NEWLINE)
+      var end = System.currentTimeMillis
+      (end - start) should be >= time
+    }
+    client.in.read()
+    heart_beat_after(900)
+    heart_beat_after(900)
+  }
+
+
+  test("Stomp 1.1 Broker times out idle connection") {
+    StompProtocolHandler.inbound_heartbeat = 1000L
+    try {
+
+      client.open("localhost", port)
+
+      client.write(
+        "CONNECT\n" +
+                "accept-version:1.1\n" +
+                "host:localhost\n" +
+                "heart-beat:1000,0\n" +
+                "\n")
+
+      var frame = client.receive()
+      frame should startWith("CONNECTED\n")
+      frame should include regex ("""heart-beat:.+?\n""")
+
+      var start = System.currentTimeMillis
+
+      frame = client.receive()
+      frame should startWith("ERROR\n")
+      frame should include regex ("""message:.+?\n""")
+
+      var end = System.currentTimeMillis
+      (end - start) should be >= 1000L
+
+    } finally {
+      StompProtocolHandler.inbound_heartbeat = StompProtocolHandler.DEFAULT_INBOUND_HEARTBEAT
+    }
+  }
+
+  test("UDP to STOMP interop") {
+
+    connect("1.1")
+    subscribe("0", "/topic/udp")
+
+    val udp_port: Int = connector_port("udp").get
+    val channel = DatagramChannel.open();
+
+    val target = new InetSocketAddress("127.0.0.1", udp_port)
+    channel.send(new AsciiBuffer("Hello").toByteBuffer, target)
+
+    assert_received("Hello")
+  }
+
+  /**
+   * These disconnect tests assure that we don't drop message deliviers that are in flight
+   * if a client disconnects before those deliveries are accepted by the target destination.
+   */
+  test("Messages delivery assured to a queued once a disconnect receipt is received") {
+
+    // figure out at what point a quota'ed queue stops accepting more messages.
+    connect("1.1")
+    client.socket.setSoTimeout(1 * 1000)
+    var block_count = 0
+    try {
+      while (true) {
+        sync_send("/queue/quota.assured1", "%01024d".format(block_count))
+        block_count += 1
+      }
+    } catch {
+      case e: SocketTimeoutException =>
+    }
+    close()
+
+    // Send 5 more messages which do not fit in the queue, they will be
+    // held in the producer connection's delivery session buffer..
+    connect("1.1")
+    for (i <- 0 until (block_count + 5)) {
+      async_send("/queue/quota.assured2", "%01024d".format(i))
+    }
+
+    // Even though we disconnect, those 5 that did not fit should still
+    // get delivered once the queue unblocks..
+    disconnect()
+
+    // Lets make sure non of the messages were dropped.
+    connect("1.1")
+    subscribe("0", "/queue/quota.assured2")
+    for (i <- 0 until (block_count + 5)) {
+      assert_received("%01024d".format(i))
+    }
+
+  }
+
+  test("Messages delivery assured to a topic once a disconnect receipt is received") {
+
+    //setup a subscription which will block quickly..
+    var consumer = new StompClient
+    connect("1.1", consumer)
+    subscribe("0", "/topic/quota.assured1", "client", headers = "credit:1,0\n", c = consumer)
+
+    // figure out at what point a quota'ed consumer stops accepting more messages.
+    connect("1.1")
+    client.socket.setSoTimeout(1 * 1000)
+    var block_count = 0
+    try {
+      while (true) {
+        sync_send("/topic/quota.assured1", "%01024d".format(block_count))
+        block_count += 1
+      }
+    } catch {
+      case e: SocketTimeoutException =>
+    }
+    close()
+    close(consumer)
+
+    connect("1.1", consumer)
+    subscribe("0", "/topic/quota.assured2", "client", headers = "credit:1,0\n", c = consumer)
+
+    // Send 5 more messages which do not fit in the consumer buffer, they will be
+    // held in the producer connection's delivery session buffer..
+    connect("1.1")
+    for (i <- 0 until (block_count + 5)) {
+      async_send("/topic/quota.assured2", "%01024d".format(i))
+    }
+
+    // Even though we disconnect, those 5 that did not fit should still
+    // get delivered once the queue unblocks..
+    disconnect()
+
+    // Lets make sure non of the messages were dropped.
+    for (i <- 0 until (block_count + 5)) {
+      assert_received("%01024d".format(i), c = consumer)(true)
+    }
+
+  }
+
+  test("APLO-206 - Load balance of job queues using small consumer credit windows") {
+    connect("1.1")
+
+    for (i <- 1 to 4) {
+      async_send("/queue/load-balanced2", i)
+    }
+
+    subscribe("1", "/queue/load-balanced2", "client", false, "credit:1,0\n")
+    val ack1 = assert_received(1, "1")
+
+    subscribe("2", "/queue/load-balanced2", "client", false, "credit:1,0\n")
+    val ack2 = assert_received(2, "2")
+
+    // Ok lets ack now..
+    ack1(true)
+    val ack3 = assert_received(3, "1")
+
+    ack2(true)
+    val ack4 = assert_received(4, "2")
+  }
+
+  test("Browsing queues does not cause AssertionError.  Reported in APLO-156") {
+    skip_if_using_store
+    connect("1.1")
+    subscribe("0", "/queue/TOOL.DEFAULT")
+    async_send("/queue/TOOL.DEFAULT", "1")
+    async_send("/queue/TOOL.DEFAULT", "2")
+    assert_received("1", "0")
+    assert_received("2", "0")
+    subscribe("1", "/queue/TOOL.DEFAULT", "auto", false, "browser:true\n")
+    val frame = client.receive()
+    frame should startWith(
+      "MESSAGE\n" +
+              "subscription:1\n" +
+              "destination:\n" +
+              "message-id:\n" +
+              "browser:end")
+  }
+
+  test("retain:set makes a topic remeber the message") {
+    connect("1.1")
+    async_send("/topic/retained-example", 1)
+    async_send("/topic/retained-example", 2, "retain:set\n")
+    sync_send("/topic/retained-example", 3)
+    subscribe("0", "/topic/retained-example")
+    assert_received(2)
+    async_send("/topic/retained-example", 4)
+    assert_received(4)
+  }
+
+  test("retain:remove makes a topic forget the message") {
+    connect("1.1")
+    async_send("/topic/retained-example2", 1)
+    async_send("/topic/retained-example2", 2, "retain:set\n")
+    async_send("/topic/retained-example2", 3, "retain:remove\n")
+    subscribe("0", "/topic/retained-example2")
+    async_send("/topic/retained-example2", 4)
+    assert_received(4)
+  }
+
+  test("Setting `from-seq` header to -1 results in subscription starting at end of the queue.") {
+    skip_if_using_store
+    connect("1.1")
+
+    def send(id: Int) = {
+      client.write(
+        "SEND\n" +
+                "destination:/queue/from-seq-end\n" +
+                "receipt:0\n" +
+                "\n" +
+                "message:" + id + "\n")
+      wait_for_receipt("0")
+    }
+
+    send(1)
+    send(2)
+    send(3)
+
+    client.write(
+      "SUBSCRIBE\n" +
+              "destination:/queue/from-seq-end\n" +
+              "receipt:0\n" +
+              "browser:true\n" +
+              "browser-end:false\n" +
+              "id:0\n" +
+              "from-seq:-1\n" +
+              "\n")
+    wait_for_receipt("0")
+
+    send(4)
+
+    def get(seq: Long) = {
+      val frame = client.receive()
+      frame should startWith("MESSAGE\n")
+      frame should include("message:" + seq + "\n")
+    }
+    get(4)
+  }
+
+  test("The `browser-end:false` can be used to continously browse a queue.") {
+    connect("1.1")
+    client.write(
+      "SUBSCRIBE\n" +
+              "destination:/queue/browsing-continous\n" +
+              "browser:true\n" +
+              "browser-end:false\n" +
+              "receipt:0\n" +
+              "id:0\n" +
+              "\n")
+    wait_for_receipt("0")
+
+    def send(id: Int) = client.write(
+      "SEND\n" +
+              "destination:/queue/browsing-continous\n" +
+              "\n" +
+              "message:" + id + "\n")
+
+    send(1)
+    send(2)
+
+    def get(seq: Long) = {
+      val frame = client.receive()
+      frame should startWith("MESSAGE\n")
+      expect(true)(frame.contains("message:" + seq + "\n"))
+    }
+    get(1)
+    get(2)
+  }
+
+  test("Message sequence headers are added when `include-seq` is used.") {
+    connect("1.1")
+    client.write(
+      "SUBSCRIBE\n" +
+              "destination:/queue/seq_queue\n" +
+              "receipt:0\n" +
+              "id:0\n" +
+              "include-seq:seq\n" +
+              "\n")
+    wait_for_receipt("0")
+
+    def send(id: Int) = client.write(
+      "SEND\n" +
+              "destination:/queue/seq_queue\n" +
+              "\n" +
+              "message:" + id + "\n")
+
+    send(1)
+    send(2)
+
+    def get(seq: Long) = {
+      val frame = client.receive()
+      frame should startWith("MESSAGE\n")
+      expect(true)(frame.contains("seq:" + seq + "\n"))
+    }
+    get(1)
+    get(2)
+  }
+
+  test("The `from-seq` header can be used to resume delivery from a given point in a queue.") {
+    skip_if_using_store
+    connect("1.1")
+
+    def send(id: Int) = {
+      client.write(
+        "SEND\n" +
+                "destination:/queue/from_queue\n" +
+                "receipt:0\n" +
+                "\n" +
+                "message:" + id + "\n")
+      wait_for_receipt("0")
+    }
+
+    send(1)
+    send(2)
+    send(3)
+
+    client.write(
+      "SUBSCRIBE\n" +
+              "destination:/queue/from_queue\n" +
+              "receipt:0\n" +
+              "browser:true\n" +
+              "id:0\n" +
+              "include-seq:seq\n" +
+              "from-seq:2\n" +
+              "\n")
+    wait_for_receipt("0")
+
+    def get(seq: Long) = {
+      val frame = client.receive()
+      frame should startWith("MESSAGE\n")
+      frame should include("seq:" + seq + "\n")
+    }
+    get(2)
+    get(3)
+  }
+
+
+  test("The `from-seq` header is not supported with wildcard or composite destinations.") {
+    connect("1.1")
+
+    client.write(
+      "SUBSCRIBE\n" +
+              "destination:/queue/some,/queue/other\n" +
+              "browser:true\n" +
+              "id:0\n" +
+              "include-seq:seq\n" +
+              "from-seq:2\n" +
+              "\n")
+
+    var frame = client.receive()
+    frame should startWith("ERROR\n")
+    frame should include("message:The from-seq header is only supported when you subscribe to one destination")
+
+    client.close
+    connect("1.1")
+
+    client.write(
+      "SUBSCRIBE\n" +
+              "destination:/queue/some.*\n" +
+              "browser:true\n" +
+              "id:0\n" +
+              "include-seq:seq\n" +
+              "from-seq:2\n" +
+              "\n")
+
+    frame = client.receive()
+    frame should startWith("ERROR\n")
+    frame should include("message:The from-seq header is only supported when you subscribe to one destination")
+  }
+
+  test("Selector Syntax") {
+    connect("1.1")
+
+    var sub_id = 0;
+    def test_selector(selector: String, headers: List[String], expected_matches: List[Int]) = {
+
+      client.write(
+        "SUBSCRIBE\n" +
+                "destination:/topic/selected\n" +
+                "selector:" + selector + "\n" +
+                "receipt:0\n" +
+                "id:" + sub_id + "\n" +
+                "\n")
+      wait_for_receipt("0")
+
+      var id = 1;
+
+      headers.foreach {
+        header =>
+          client.write(
+            "SEND\n" +
+                    "destination:/topic/selected\n" +
+                    header + "\n" +
+                    "\n" +
+                    "message:" + id + "\n")
+          id += 1;
+      }
+
+      expected_matches.foreach {
+        id =>
+          val frame = client.receive()
+          frame should startWith("MESSAGE\n")
+          frame should endWith regex ("\n\nmessage:" + id + "\n")
+      }
+
+      client.write(
+        "UNSUBSCRIBE\n" +
+                "id:" + sub_id + "\n" +
+                "receipt:0\n" +
+                "\n")
+
+      wait_for_receipt("0")
+
+      sub_id += 1
+    }
+
+    test_selector("color = 'red'", List("color:blue", "not:set", "color:red"), List(3))
+    test_selector("hyphen-field = 'red'", List("hyphen-field:blue", "not:set", "hyphen-field:red"), List(3))
+    test_selector("age >= 21", List("age:3", "not:set", "age:21", "age:30"), List(3, 4))
+
+  }
+
+  test("Queues load balance across subscribers") {
+    connect("1.1")
+    subscribe("1", "/queue/load-balanced")
+    subscribe("2", "/queue/load-balanced")
+
+    for (i <- 0 until 4) {
+      async_send("/queue/load-balanced", "message:" + i)
+    }
+
+    var sub1_counter = 0
+    var sub2_counter = 0
+
+    def get() = {
+      val frame = client.receive()
+      frame should startWith("MESSAGE\n")
+
+      if (frame.contains("subscription:1\n")) {
+        sub1_counter += 1
+      } else if (frame.contains("subscription:2\n")) {
+        sub2_counter += 1
+      }
+    }
+
+    for (i <- 0 until 4) {
+      get()
+    }
+
+    sub1_counter should be(2)
+    sub2_counter should be(2)
+
+  }
+
+  test("Queues do NOT load balance across exclusive subscribers") {
+    connect("1.1")
+
+    // Connect to subscribers
+    client.write(
+      "SUBSCRIBE\n" +
+              "destination:/queue/exclusive\n" +
+              "id:1\n" +
+              "\n")
+
+    client.write(
+      "SUBSCRIBE\n" +
+              "destination:/queue/exclusive\n" +
+              "exclusive:true\n" +
+              "receipt:0\n" +
+              "ack:client\n" +
+              "id:2\n" +
+              "\n")
+
+    wait_for_receipt("0")
+
+    def put(id: Int) = {
+      client.write(
+        "SEND\n" +
+                "destination:/queue/exclusive\n" +
+                "\n" +
+                "message:" + id + "\n")
+    }
+
+    for (i <- 0 until 4) {
+      put(i)
+    }
+
+    var sub1_counter = 0
+    var sub2_counter = 0
+
+    def get() = {
+      val frame = client.receive()
+      frame should startWith("MESSAGE\n")
+
+      if (frame.contains("subscription:1\n")) {
+        sub1_counter += 1
+      } else if (frame.contains("subscription:2\n")) {
+        sub2_counter += 1
+      }
+    }
+
+    for (i <- 0 until 4) {
+      get()
+    }
+
+    sub1_counter should be(0)
+    sub2_counter should be(4)
+
+    // disconnect the exclusive subscriber.
+    client.write(
+      "UNSUBSCRIBE\n" +
+              "id:2\n" +
+              "\n")
+
+    // sub 1 should now get all the messages.
+    for (i <- 0 until 4) {
+      get()
+    }
+    sub1_counter should be(4)
+
+  }
+
+  test("Queue browsers don't consume the messages") {
+    skip_if_using_store
+    connect("1.1")
+
+    def put(id: Int) = {
+      client.write(
+        "SEND\n" +
+                "destination:/queue/browsing\n" +
+                "receipt:0\n" +
+                "\n" +
+                "message:" + id + "\n")
+      wait_for_receipt("0")
+    }
+
+    put(1)
+    put(2)
+    put(3)
+
+    // create a browser subscription.
+    client.write(
+      "SUBSCRIBE\n" +
+              "destination:/queue/browsing\n" +
+              "browser:true\n" +
+              "id:0\n" +
+              "\n")
+
+    def get(sub: Int, id: Int) = {
+      val frame = client.receive()
+      frame should startWith("MESSAGE\n")
+      frame should include("subscription:%d\n".format(sub))
+      frame should endWith regex ("\n\nmessage:%d\n".format(id))
+    }
+    get(0, 1)
+    get(0, 2)
+    get(0, 3)
+
+    // Should get a browse end message
+    val frame = client.receive()
+    frame should startWith("MESSAGE\n")
+    frame should include("subscription:0\n")
+    frame should include("browser:end\n")
+    frame should include("\nmessage-id:")
+    frame should include("\ndestination:")
+
+    // create a regular subscription.
+    client.write(
+      "SUBSCRIBE\n" +
+              "destination:/queue/browsing\n" +
+              "id:1\n" +
+              "\n")
+
+    get(1, 1)
+    get(1, 2)
+    get(1, 3)
+
+  }
+
+  test("Queue order preserved") {
+    connect("1.1")
+
+    def put(id: Int) = {
+      client.write(
+        "SEND\n" +
+                "destination:/queue/example\n" +
+                "\n" +
+                "message:" + id + "\n")
+    }
+    put(1)
+    put(2)
+    put(3)
+
+    client.write(
+      "SUBSCRIBE\n" +
+              "destination:/queue/example\n" +
+              "id:0\n" +
+              "\n")
+
+    def get(id: Int) = {
+      val frame = client.receive()
+      frame should startWith("MESSAGE\n")
+      frame should include("subscription:0\n")
+      frame should endWith regex ("\n\nmessage:" + id + "\n")
+    }
+    get(1)
+    get(2)
+    get(3)
+  }
+
+  test("Topic drops messages sent before before subscription is established") {
+    connect("1.1")
+
+    def put(id: Int) = {
+      client.write(
+        "SEND\n" +
+                "destination:/topic/updates1\n" +
+                "\n" +
+                "message:" + id + "\n")
+    }
+    put(1)
+
+    client.write(
+      "SUBSCRIBE\n" +
+              "destination:/topic/updates1\n" +
+              "id:0\n" +
+              "receipt:0\n" +
+              "\n")
+    wait_for_receipt("0")
+
+    put(2)
+    put(3)
+
+    def get(id: Int) = {
+      val frame = client.receive()
+      frame should startWith("MESSAGE\n")
+      frame should include("subscription:0\n")
+      frame should endWith regex ("\n\nmessage:" + id + "\n")
+    }
+
+    // note that the put(1) message gets dropped.
+    get(2)
+    get(3)
+  }
+
+  test("Topic /w Durable sub retains messages.") {
+    connect("1.1")
+
+    def put(id: Int) = {
+      client.write(
+        "SEND\n" +
+                "destination:/topic/updates2\n" +
+                "\n" +
+                "message:" + id + "\n")
+    }
+
+    client.write(
+      "SUBSCRIBE\n" +
+              "destination:/topic/updates2\n" +
+              "id:my-sub-name\n" +
+              "persistent:true\n" +
+              "receipt:0\n" +
+              "\n")
+    wait_for_receipt("0")
+    client.close
+
+    // Close him out.. since persistent:true then
+    // the topic subscription will be persistent accross client
+    // connections.
+
+    connect("1.1")
+    put(1)
+    put(2)
+    put(3)
+
+    client.write(
+      "SUBSCRIBE\n" +
+              "destination:/topic/updates2\n" +
+              "id:my-sub-name\n" +
+              "persistent:true\n" +
+              "\n")
+
+    def get(id: Int) = {
+      val frame = client.receive()
+      frame should startWith("MESSAGE\n")
+      frame should include("subscription:my-sub-name\n")
+      frame should endWith regex ("\n\nmessage:" + id + "\n")
+    }
+
+    get(1)
+    get(2)
+    get(3)
+  }
+
+  test("Queue and a selector") {
+    connect("1.1")
+
+    def put(id: Int, color: String) = {
+      client.write(
+        "SEND\n" +
+                "destination:/queue/selected\n" +
+                "color:" + color + "\n" +
+                "\n" +
+                "message:" + id + "\n")
+    }
+    put(1, "red")
+    put(2, "blue")
+    put(3, "red")
+
+    client.write(
+      "SUBSCRIBE\n" +
+              "destination:/queue/selected\n" +
+              "selector:color='red'\n" +
+              "id:0\n" +
+              "\n")
+
+    def get(id: Int) = {
+      val frame = client.receive()
+      frame should startWith("MESSAGE\n")
+      frame should endWith regex ("\n\nmessage:" + id + "\n")
+    }
+    get(1)
+    get(3)
+  }
+
+  test("Topic and a selector") {
+    connect("1.1")
+
+    def put(id: Int, color: String) = {
+      client.write(
+        "SEND\n" +
+                "destination:/topic/selected\n" +
+                "color:" + color + "\n" +
+                "\n" +
+                "message:" + id + "\n")
+    }
+
+    client.write(
+      "SUBSCRIBE\n" +
+              "destination:/topic/selected\n" +
+              "selector:color='red'\n" +
+              "id:0\n" +
+              "receipt:0\n" +
+              "\n")
+    wait_for_receipt("0")
+
+    put(1, "red")
+    put(2, "blue")
+    put(3, "red")
+
+    def get(id: Int) = {
+      val frame = client.receive()
+      frame should startWith("MESSAGE\n")
+      frame should endWith regex ("\n\nmessage:" + id + "\n")
+    }
+    get(1)
+    get(3)
+  }
+
+  test("Topic gets copy of message sent to queue") {
+    connect("1.1")
+    subscribe("1", "/topic/mirrored.a")
+    async_send("/queue/mirrored.a", "message:1\n")
+    assert_received("message:1\n")
+  }
+
+  test("Queue gets copy of message sent to topic") {
+    connect("1.1")
+
+    // Connect to subscribers
+    client.write(
+      "SUBSCRIBE\n" +
+              "destination:/queue/mirrored.b\n" +
+              "id:1\n" +
+              "receipt:0\n" +
+              "\n")
+    wait_for_receipt("0")
+
+    def put(id: Int) = {
+      client.write(
+        "SEND\n" +
+                "destination:/topic/mirrored.b\n" +
+                "\n" +
+                "message:" + id + "\n")
+    }
+
+    put(1)
+
+    def get(id: Int) = {
+      val frame = client.receive()
+      frame should startWith("MESSAGE\n")
+      frame should endWith regex ("\n\nmessage:" + id + "\n")
+    }
+    get(1)
+
+  }
+
+  test("Queue does not get copies from topic until it's first created") {
+    connect("1.1")
+
+    def put(id: Int) = {
+      client.write(
+        "SEND\n" +
+                "destination:/topic/mirrored.c\n" +
+                "\n" +
+                "message:" + id + "\n")
+    }
+
+    put(1)
+
+    // Connect to subscribers
+    client.write(
+      "SUBSCRIBE\n" +
+              "destination:/queue/mirrored.c\n" +
+              "id:1\n" +
+              "receipt:0\n" +
+              "\n")
+    wait_for_receipt("0")
+
+    put(2)
+
+    def get(id: Int) = {
+      val frame = client.receive()
+      frame should startWith("MESSAGE\n")
+      frame should endWith regex ("\n\nmessage:" + id + "\n")
+    }
+    get(2)
+  }
+
+  def path_separator = "."
+
+  test("Messages Expire") {
+    connect("1.1")
+
+    def put(msg: String, ttl: Option[Long] = None) = {
+      val expires_header = ttl.map(t => "expires:" + (System.currentTimeMillis() + t) + "\n").getOrElse("")
+      client.write(
+        "SEND\n" +
+                expires_header +
+                "destination:/queue/exp\n" +
+                "\n" +
+                "message:" + msg + "\n")
+    }
+
+    put("1")
+    put("2", Some(1000L))
+    put("3")
+
+    Thread.sleep(2000)
+
+    client.write(
+      "SUBSCRIBE\n" +
+              "destination:/queue/exp\n" +
+              "id:1\n" +
+              "receipt:0\n" +
+              "\n")
+    wait_for_receipt("0")
+
+
+    def get(dest: String) = {
+      val frame = client.receive()
+      frame should startWith("MESSAGE\n")
+      frame should endWith("\n\nmessage:%s\n".format(dest))
+    }
+
+    get("1")
+    get("3")
+  }
+
+  test("Receipts on SEND to unconsummed topic") {
+    connect("1.1")
+
+    def put(id: Int) = {
+      client.write(
+        "SEND\n" +
+                "destination:/topic/receipt-test\n" +
+                "receipt:" + id + "\n" +
+                "\n" +
+                "message:" + id + "\n")
+    }
+
+    put(1)
+    put(2)
+    wait_for_receipt("1")
+    wait_for_receipt("2")
+
+
+  }
+
+  test("Receipts on SEND to a consumed topic") {
+    connect("1.1")
+
+    def put(id: Int) = {
+      client.write(
+        "SEND\n" +
+                "destination:/topic/receipt-test\n" +
+                "receipt:" + id + "\n" +
+                "\n" +
+                "message:" + id + "\n")
+    }
+
+    // start a consumer on a different connection
+    var consumer = new StompClient
+    connect("1.1", consumer)
+    consumer.write(
+      "SUBSCRIBE\n" +
+              "destination:/topic/receipt-test\n" +
+              "id:0\n" +
+              "receipt:0\n" +
+              "\n")
+    wait_for_receipt("0", consumer)
+
+    put(1)
+    put(2)
+    wait_for_receipt("1")
+    wait_for_receipt("2")
+
+  }
+
+  test("Transacted commit after unsubscribe") {
+    val producer = new StompClient
+    val consumer = new StompClient
+
+    connect("1.1", producer)
+    connect("1.1", consumer)
+
+    // subscribe the consumer
+    subscribe("0", "/queue/test", "client-individual", false, "", true, consumer)
+
+    // begin the transaction on the consumer
+    consumer.write(
+      "BEGIN\n" +
+              "transaction:x\n" +
+              "\n")
+
+    sync_send("/queue/test", "Hello world", "", producer)
+
+    val ack = assert_received("Hello world", "0", consumer, "x")
+    ack(true)
+
+    unsubscribe("0", "", consumer)
+
+    consumer.write(
+      "COMMIT\n" +
+              "transaction:x\n" +
+              "\n")
+
+    sync_send("/queue/test", "END", "", producer)
+    subscribe("1", "/queue/test", c = producer)
+    assert_received("END", "1", producer)
+    // since we committed the transaction AFTER un-subscribing, there should be nothing in
+    // the queue
+
+  }
+
+  test("Queue and a transacted send") {
+    connect("1.1")
+
+    def put(id: Int, tx: String = null) = {
+      client.write(
+        "SEND\n" +
+                "destination:/queue/transacted\n" + {
+          if (tx != null) {
+            "transaction:" + tx + "\n"
+          } else {
+            ""
+          }
+        } +
+                "\n" +
+                "message:" + id + "\n")
+    }
+
+    put(1)
+    client.write(
+      "BEGIN\n" +
+              "transaction:x\n" +
+              "\n")
+    put(2, "x")
+    put(3)
+
+    client.write(
+      "SUBSCRIBE\n" +
+              "destination:/queue/transacted\n" +
+              "id:0\n" +
+              "\n")
+
+    def get(id: Int) = {
+      val frame = client.receive()
+      frame should startWith("MESSAGE\n")
+      frame should endWith regex ("\n\nmessage:" + id + "\n")
+    }
+    get(1)
+    get(3)
+
+    client.write(
+      "COMMIT\n" +
+              "transaction:x\n" +
+              "\n")
+
+    get(2)
+
+  }
+
+  test("Topic and a transacted send") {
+    connect("1.1")
+
+    def put(id: Int, tx: String = null) = {
+      client.write(
+        "SEND\n" +
+                "destination:/topic/transacted\n" + {
+          if (tx != null) {
+            "transaction:" + tx + "\n"
+          } else {
+            ""
+          }
+        } +
+                "\n" +
+                "message:" + id + "\n")
+    }
+
+    client.write(
+      "SUBSCRIBE\n" +
+              "destination:/topic/transacted\n" +
+              "id:0\n" +
+              "receipt:0\n" +
+              "\n")
+    wait_for_receipt("0")
+
+    put(1)
+    client.write(
+      "BEGIN\n" +
+              "transaction:x\n" +
+              "\n")
+    put(2, "x")
+    put(3)
+
+    def get(id: Int) = {
+      val frame = client.receive()
+      frame should startWith("MESSAGE\n")
+      frame should endWith regex ("\n\nmessage:" + id + "\n")
+    }
+
+    get(1)
+    get(3)
+
+    client.write(
+      "COMMIT\n" +
+              "transaction:x\n" +
+              "\n")
+
+    get(2)
+
+  }
+
+  test("ack:client redelivers on client disconnect") {
+    connect("1.1")
+
+    def put(id: Int) = {
+      client.write(
+        "SEND\n" +
+                "destination:/queue/ackmode-client\n" +
+                "\n" +
+                "message:" + id + "\n")
+    }
+    put(1)
+    put(2)
+    put(3)
+
+    client.write(
+      "SUBSCRIBE\n" +
+              "destination:/queue/ackmode-client\n" +
+              "ack:client\n" +
+              "id:0\n" +
+              "\n")
+
+    def get(id: Int) = {
+      val frame = client.receive()
+      frame should startWith("MESSAGE\n")
+      frame should include("subscription:0\n")
+      frame should include regex ("message-id:.+?\n")
+      frame should endWith regex ("\n\nmessage:" + id + "\n")
+
+      val p = """(?s).*?\nmessage-id:(.+?)\n.*""".r
+      frame match {
+        case p(x) => x
+        case _ => null
+      }
+    }
+
+    get(1)
+    val mid = get(2)
+    get(3)
+
+    // Ack the first 2 messages..
+    client.write(
+      "ACK\n" +
+              "subscription:0\n" +
+              "message-id:" + mid + "\n" +
+              "receipt:0\n" +
+              "\n")
+
+    wait_for_receipt("0")
+    client.close
+
+    connect("1.1")
+
+    client.write(
+      "SUBSCRIBE\n" +
+              "destination:/queue/ackmode-client\n" +
+              "ack:client\n" +
+              "id:0\n" +
+              "\n")
+    get(3)
+
+
+  }
+
+
+  test("ack:client-individual redelivers on client disconnect") {
+    connect("1.1")
+
+    def put(id: Int) = {
+      client.write(
+        "SEND\n" +
+                "destination:/queue/ackmode-message\n" +
+                "\n" +
+                "message:" + id + "\n")
+    }
+    put(1)
+    put(2)
+    put(3)
+
+    client.write(
+      "SUBSCRIBE\n" +
+              "destination:/queue/ackmode-message\n" +
+              "ack:client-individual\n" +
+              "id:0\n" +
+              "\n")
+
+    def get(id: Int) = {
+      val frame = client.receive()
+      frame should startWith("MESSAGE\n")
+      frame should include("subscription:0\n")
+      frame should include regex ("message-id:.+?\n")
+      frame should endWith regex ("\n\nmessage:" + id + "\n")
+
+      val p = """(?s).*?\nmessage-id:(.+?)\n.*""".r
+      frame match {
+        case p(x) => x
+        case _ => null
+      }
+    }
+
+    get(1)
+    val mid = get(2)
+    get(3)
+
+    // Ack the first 2 messages..
+    client.write(
+      "ACK\n" +
+              "subscription:0\n" +
+              "message-id:" + mid + "\n" +
+              "receipt:0\n" +
+              "\n")
+
+    wait_for_receipt("0")
+    client.close
+
+    connect("1.1")
+
+    client.write(
+      "SUBSCRIBE\n" +
+              "destination:/queue/ackmode-message\n" +
+              "ack:client-individual\n" +
+              "id:0\n" +
+              "\n")
+    get(1)
+    get(3)
+
+  }
+
+  test("Temp Queue Send Receive") {
+    connect("1.1")
+
+    def put(msg: String) = {
+      client.write(
+        "SEND\n" +
+                "destination:/temp-queue/test\n" +
+                "reply-to:/temp-queue/test\n" +
+                "receipt:0\n" +
+                "\n" +
+                "message:" + msg + "\n")
+      wait_for_receipt("0")
+    }
+
+    put("1")
+
+    client.write(
+      "SUBSCRIBE\n" +
+              "destination:/temp-queue/test\n" +
+              "id:1\n" +
+              "\n")
+
+    def get(dest: String) = {
+      val frame = client.receive()
+      frame should startWith("MESSAGE\n")
+      frame should endWith("\n\nmessage:%s\n".format(dest))
+
+      // extract headers as a map of values.
+      Map((frame.split("\n").reverse.flatMap {
+        line =>
+          if (line.contains(":")) {
+            val parts = line.split(":", 2)
+            Some((parts(0), parts(1)))
+          } else {
+            None
+          }
+      }): _*)
+    }
+
+    // The destination and reply-to headers should get updated with actual
+    // Queue names
+    val message = get("1")
+    val actual_temp_dest_name = message.get("destination").get
+    actual_temp_dest_name should startWith("/queue/temp.default.")
+    message.get("reply-to") should be === (message.get("destination"))
+
+    // Different connection should be able to send a message to the temp destination..
+    var other = new StompClient
+    connect("1.1", other)
+    other.write(
+      "SEND\n" +
+              "destination:" + actual_temp_dest_name + "\n" +
+              "receipt:0\n" +
+              "\n")
+    wait_for_receipt("0", other)
+
+    // First client chould get the message.
+    var frame = client.receive()
+    frame should startWith("MESSAGE\n")
+
+    // But not consume from it.
+    other.write(
+      "SUBSCRIBE\n" +
+              "destination:" + actual_temp_dest_name + "\n" +
+              "id:1\n" +
+              "receipt:0\n" +
+              "\n")
+    frame = other.receive()
+    frame should startWith("ERROR\n")
+    frame should include regex ("""message:Not authorized to receive from the temporary destination""")
+    other.close()
+
+    // Check that temp queue is deleted once the client disconnects
+    put("2")
+    expect(true)(queue_exists(actual_temp_dest_name.stripPrefix("/queue/")))
+    client.close();
+
+    within(10, SECONDS) {
+      expect(false)(queue_exists(actual_temp_dest_name.stripPrefix("/queue/")))
+    }
+  }
+
+  test("Temp Topic Send Receive") {
+    connect("1.1")
+
+    client.write(
+      "SUBSCRIBE\n" +
+              "destination:/temp-topic/test\n" +
+              "id:1\n" +
+              "\n")
+
+    def get(dest: String) = {
+      val frame = client.receive()
+      frame should startWith("MESSAGE\n")
+      frame should endWith("\n\nmessage:%s\n".format(dest))
+
+      // extract headers as a map of values.
+      Map((frame.split("\n").reverse.flatMap {
+        line =>
+          if (line.contains(":")) {
+            val parts = line.split(":", 2)
+            Some((parts(0), parts(1)))
+          } else {
+            None
+          }
+      }): _*)
+    }
+
+    def put(msg: String) = {
+      client.write(
+        "SEND\n" +
+                "destination:/temp-topic/test\n" +
+                "reply-to:/temp-topic/test\n" +
+                "receipt:0\n" +
+                "\n" +
+                "message:" + msg + "\n")
+      wait_for_receipt("0", client)
+    }
+    put("1")
+
+    // The destination and reply-to headers should get updated with actual
+    // Queue names
+    val message = get("1")
+    val actual_temp_dest_name = message.get("destination").get
+    actual_temp_dest_name should startWith("/topic/temp.default.")
+    message.get("reply-to") should be === (message.get("destination"))
+
+    // Different connection should be able to send a message to the temp destination..
+    var other = new StompClient
+    connect("1.1", other)
+    other.write(
+      "SEND\n" +
+              "destination:" + actual_temp_dest_name + "\n" +
+              "receipt:0\n" +
+              "\n")
+    wait_for_receipt("0", other)
+
+    // First client chould get the message.
+    var frame = client.receive()
+    frame should startWith("MESSAGE\n")
+
+    // But not consume from it.
+    other.write(
+      "SUBSCRIBE\n" +
+              "destination:" + actual_temp_dest_name + "\n" +
+              "id:1\n" +
+              "receipt:0\n" +
+              "\n")
+    frame = other.receive()
+    frame should startWith("ERROR\n")
+    frame should include regex ("""message:Not authorized to receive from the temporary destination""")
+    other.close()
+
+    // Check that temp queue is deleted once the client disconnects
+    put("2")
+    expect(true)(topic_exists(actual_temp_dest_name.stripPrefix("/topic/")))
+    client.close();
+
+    within(10, SECONDS) {
+      expect(false)(topic_exists(actual_temp_dest_name.stripPrefix("/topic/")))
+    }
+
+
+  }
+
+  test("Odd reply-to headers do not cause errors") {
+    connect("1.1")
+
+    client.write(
+      "SEND\n" +
+              "destination:/queue/oddrepyto\n" +
+              "reply-to:sms:8139993334444\n" +
+              "receipt:0\n" +
+              "\n")
+    wait_for_receipt("0")
+
+    client.write(
+      "SUBSCRIBE\n" +
+              "destination:/queue/oddrepyto\n" +
+              "id:1\n" +
+              "\n")
+
+    val frame = client.receive()
+    frame should startWith("MESSAGE\n")
+    frame should include("reply-to:sms:8139993334444\n")
+  }
+
+  test("NACKing moves messages to DLQ (non-persistent)") {
+    connect("1.1")
+    sync_send("/queue/nacker.a", "this msg is not persistent")
+
+    subscribe("0", "/queue/nacker.a", "client", false, "", false)
+    subscribe("dlq", "/queue/dlq.nacker.a", "auto", false, "", false)
+    var ack = assert_received("this msg is not persistent", "0")
+    ack(false)
+    ack = assert_received("this msg is not persistent", "0")
+    ack(false)
+
+    // It should be sent to the DLQ after the 2nd nak
+    assert_received("this msg is not persistent", "dlq")
+  }
+
+  test("NACKing moves messages to DLQ (persistent)") {
+    connect("1.1")
+    sync_send("/queue/nacker.b", "this msg is persistent", "persistent:true\n")
+
+    subscribe("0", "/queue/nacker.b", "client", false, "", false)
+    subscribe("dlq", "/queue/dlq.nacker.b", "auto", false, "", false)
+    var ack = assert_received("this msg is persistent", "0")
+    ack(false)
+    ack = assert_received("this msg is persistent", "0")
+    ack(false)
+
+    // It should be sent to the DLQ after the 2nd nak
+    assert_received("this msg is persistent", "dlq")
+  }
+
+  test("NACKing without DLQ consumer (persistent)") {
+    connect("1.1")
+    sync_send("/queue/nacker.c", "this msg is persistent", "persistent:true\n")
+
+    subscribe("0", "/queue/nacker.c", "client", false, "", false)
+
+    var ack = assert_received("this msg is persistent", "0")
+    ack(false)
+    ack = assert_received("this msg is persistent", "0")
+    ack(false)
+    Thread.sleep(1000)
+  }
+
+
+}

Added: activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompSecurityTest.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompSecurityTest.scala?rev=1366571&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompSecurityTest.scala (added)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompSecurityTest.scala Sat Jul 28 00:25:43 2012
@@ -0,0 +1,226 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.stomp.test
+
+import java.lang.String
+
+class StompSecurityTest extends StompTestSupport {
+
+  override def broker_config_uri: String = "xml:classpath:apollo-stomp-secure.xml"
+
+  override def is_parallel_test_class: Boolean = false
+
+  override def beforeAll = {
+    try {
+      println("before: " + testName)
+      val login_file = new java.io.File(getClass.getClassLoader.getResource("login.config").getFile())
+      System.setProperty("java.security.auth.login.config", login_file.getCanonicalPath)
+    } catch {
+      case x: Throwable => x.printStackTrace
+    }
+    super.beforeAll
+  }
+
+  test("Connect with valid id password but can't connect") {
+
+    val frame = connect_request("1.1", client,
+      "login:can_not_connect\n" +
+              "passcode:can_not_connect\n")
+    frame should startWith("ERROR\n")
+    frame should include("message:Not authorized to connect")
+
+  }
+
+  test("Connect with no id password") {
+    val frame = connect_request("1.1", client)
+    frame should startWith("ERROR\n")
+    frame should include("message:Authentication failed.")
+  }
+
+  test("Connect with invalid id password") {
+    val frame = connect_request("1.1", client,
+      "login:foo\n" +
+              "passcode:bar\n")
+    frame should startWith("ERROR\n")
+    frame should include("message:Authentication failed.")
+
+  }
+
+  test("Connect with valid id password that can connect") {
+    connect("1.1", client,
+      "login:can_only_connect\n" +
+              "passcode:can_only_connect\n")
+
+  }
+
+  test("Connector restricted user on the right connector") {
+    connect("1.1", client,
+      "login:connector_restricted\n" +
+              "passcode:connector_restricted\n", "tcp2")
+  }
+
+  test("Connector restricted user on the wrong connector") {
+    val frame = connect_request("1.1", client,
+      "login:connector_restricted\n" +
+              "passcode:connector_restricted\n", "tcp")
+    frame should startWith("ERROR\n")
+    frame should include("message:Not authorized to connect to connector 'tcp'.")
+  }
+
+  test("Send not authorized") {
+    connect("1.1", client,
+      "login:can_only_connect\n" +
+              "passcode:can_only_connect\n")
+
+    client.write(
+      "SEND\n" +
+              "destination:/queue/secure\n" +
+              "receipt:0\n" +
+              "\n" +
+              "Hello Wolrd\n")
+
+    val frame = client.receive()
+    frame should startWith("ERROR\n")
+    frame should include("message:Not authorized to create the queue")
+  }
+
+  test("Send authorized but not create") {
+    connect("1.1", client,
+      "login:can_send_queue\n" +
+              "passcode:can_send_queue\n")
+
+    client.write(
+      "SEND\n" +
+              "destination:/queue/secure\n" +
+              "receipt:0\n" +
+              "\n" +
+              "Hello Wolrd\n")
+
+    val frame = client.receive()
+    frame should startWith("ERROR\n")
+    frame should include("message:Not authorized to create the queue")
+
+  }
+
+  test("Consume authorized but not create") {
+    connect("1.1", client,
+      "login:can_consume_queue\n" +
+              "passcode:can_consume_queue\n")
+
+    client.write(
+      "SUBSCRIBE\n" +
+              "destination:/queue/secure\n" +
+              "id:0\n" +
+              "receipt:0\n" +
+              "\n")
+
+    val frame = client.receive()
+    frame should startWith("ERROR\n")
+    frame should include("message:Not authorized to create the queue")
+  }
+
+  test("Send and create authorized") {
+    connect("1.1", client,
+      "login:can_send_create_queue\n" +
+              "passcode:can_send_create_queue\n")
+
+    client.write(
+      "SEND\n" +
+              "destination:/queue/secure\n" +
+              "receipt:0\n" +
+              "\n" +
+              "Hello Wolrd\n")
+
+    wait_for_receipt("0")
+
+  }
+
+  test("Send and create authorized via id_regex") {
+    connect("1.1", client,
+      "login:guest\n" +
+              "passcode:guest\n")
+
+    client.write(
+      "SEND\n" +
+              "destination:/queue/testblah\n" +
+              "receipt:0\n" +
+              "\n" +
+              "Hello Wolrd\n")
+
+    wait_for_receipt("0")
+
+    client.write(
+      "SEND\n" +
+              "destination:/queue/notmatch\n" +
+              "receipt:1\n" +
+              "\n" +
+              "Hello Wolrd\n")
+
+    val frame = client.receive()
+    frame should startWith("ERROR\n")
+    frame should include("message:Not authorized to create the queue")
+  }
+
+  test("Can send and once created") {
+
+    // Now try sending with the lower access id.
+    connect("1.1", client,
+      "login:can_send_queue\n" +
+              "passcode:can_send_queue\n")
+
+    client.write(
+      "SEND\n" +
+              "destination:/queue/secure\n" +
+              "receipt:0\n" +
+              "\n" +
+              "Hello Wolrd\n")
+
+    wait_for_receipt("0")
+
+  }
+
+  test("Consume not authorized") {
+    connect("1.1", client,
+      "login:can_only_connect\n" +
+              "passcode:can_only_connect\n")
+
+    client.write(
+      "SUBSCRIBE\n" +
+              "destination:/queue/secure\n" +
+              "id:0\n" +
+              "receipt:0\n" +
+              "\n")
+
+    val frame = client.receive()
+    frame should startWith("ERROR\n")
+    frame should include("message:Not authorized to consume from the queue")
+  }
+
+  test("Consume authorized and JMSXUserID is set on message") {
+    connect("1.1", client,
+      "login:can_send_create_consume_queue\n" +
+              "passcode:can_send_create_consume_queue\n")
+
+    subscribe("0", "/queue/sendsid")
+    async_send("/queue/sendsid", "hello")
+
+    val frame = client.receive()
+    frame should startWith("MESSAGE\n")
+    frame should include("JMSXUserID:can_send_create_consume_queue\n")
+    frame should include("sender-ip:127.0.0.1\n")
+  }
+}

Added: activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompSerialTest.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompSerialTest.scala?rev=1366571&view=auto
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompSerialTest.scala (added)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompSerialTest.scala Sat Jul 28 00:25:43 2012
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.stomp.test
+
+import org.apache.activemq.apollo.broker._
+
+/**
+ * These tests seem to have trouble being run in Parallel
+ */
+class StompSerialTest extends StompTestSupport with BrokerParallelTestExecution {
+
+  // This is the test case for https://issues.apache.org/jira/browse/APLO-88
+  test("ACK then socket close with/without DISCONNECT, should still ACK") {
+    for (i <- 1 until 3) {
+      connect("1.1")
+
+      def send(id: Int) = {
+        client.write(
+          "SEND\n" +
+                  "destination:/queue/from-seq-end\n" +
+                  "message-id:id-" + i + "-" + id + "\n" +
+                  "receipt:0\n" +
+                  "\n")
+        wait_for_receipt("0")
+      }
+
+      def get(seq: Long) = {
+        val frame = client.receive()
+        frame should startWith("MESSAGE\n")
+        frame should include("message-id:id-" + i + "-" + seq + "\n")
+        client.write(
+          "ACK\n" +
+                  "subscription:0\n" +
+                  "message-id:id-" + i + "-" + seq + "\n" +
+                  "\n")
+      }
+
+      send(1)
+      send(2)
+
+      client.write(
+        "SUBSCRIBE\n" +
+                "destination:/queue/from-seq-end\n" +
+                "id:0\n" +
+                "ack:client\n" +
+                "\n")
+      get(1)
+      client.write(
+        "DISCONNECT\n" +
+                "\n")
+      client.close
+
+      connect("1.1")
+      client.write(
+        "SUBSCRIBE\n" +
+                "destination:/queue/from-seq-end\n" +
+                "id:0\n" +
+                "ack:client\n" +
+                "\n")
+      get(2)
+      client.close
+    }
+  }
+
+}