You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2012/06/26 17:23:31 UTC

svn commit: r1354062 - in /activemq/activemq-apollo/trunk/apollo-stomp/src/test: resources/apollo-stomp.xml scala/org/apache/activemq/apollo/stomp/StompTest.scala

Author: chirino
Date: Tue Jun 26 15:23:30 2012
New Revision: 1354062

URL: http://svn.apache.org/viewvc?rev=1354062&view=rev
Log:
Tests for APLO-217 : Seems like messages are getting delivered fine even if client disconnects before messages can be delivered to destinations.

Modified:
    activemq/activemq-apollo/trunk/apollo-stomp/src/test/resources/apollo-stomp.xml
    activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/test/resources/apollo-stomp.xml
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/test/resources/apollo-stomp.xml?rev=1354062&r1=1354061&r2=1354062&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/test/resources/apollo-stomp.xml (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/test/resources/apollo-stomp.xml Tue Jun 26 15:23:30 2012
@@ -21,6 +21,11 @@
   <virtual_host id="default">
     <host_name>localhost</host_name>
 
+    <queue id="quota.**" quota="10k"/>
+    <topic id="quota.**" slow_consumer_policy="queue">
+      <subscription quota="10k"/>
+    </topic>
+
     <queue id="nacker.**" dlq="dlq.*" nak_limit="2"/>
     <queue id="mirrored.**" mirrored="true"/>
     <topic id="queued.**" slow_consumer_policy="queue">

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala?rev=1354062&r1=1354061&r2=1354062&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala Tue Jun 26 15:23:30 2012
@@ -23,11 +23,11 @@ import java.util.concurrent.TimeUnit._
 import org.apache.activemq.apollo.util._
 import java.util.concurrent.atomic.AtomicLong
 import FileSupport._
-import java.net.InetSocketAddress
 import java.nio.channels.DatagramChannel
 import org.fusesource.hawtbuf.AsciiBuffer
 import org.apache.activemq.apollo.broker._
 import org.apache.activemq.apollo.dto.{TopicStatusDTO, KeyStorageDTO}
+import java.net.{SocketTimeoutException, InetSocketAddress}
 
 class StompTestSupport extends BrokerFunSuiteSupport with ShouldMatchers with BeforeAndAfterEach {
 
@@ -599,6 +599,92 @@ class StompPersistentQueueTest extends S
 
 }
 
+/**
+ * These disconnect tests assure that we don't drop message deliviers that are in flight
+ * if a client disconnects before those deliveries are accepted by the target destination.
+ */
+class StompDisconnectTest extends StompTestSupport {
+
+  test("Messages delivery assured to a queued once a disconnect receipt is received") {
+
+    // figure out at what point a quota'ed queue stops accepting more messages.
+    connect("1.1")
+    client.socket.setSoTimeout(1*1000)
+    var block_count = 0
+    try {
+      while( true ) {
+        sync_send("/queue/quota.assured1", "%01024d".format(block_count))
+        block_count += 1
+      }
+    } catch{
+      case e:SocketTimeoutException =>
+    }
+    close()
+
+    // Send 5 more messages which do not fit in the queue, they will be
+    // held in the producer connection's delivery session buffer..
+    connect("1.1")
+    for(i <- 0 until (block_count+5)) {
+      async_send("/queue/quota.assured2", "%01024d".format(i))
+    }
+
+    // Even though we disconnect, those 5 that did not fit should still
+    // get delivered once the queue unblocks..
+    disconnect()
+
+    // Lets make sure non of the messages were dropped.
+    connect("1.1")
+    subscribe("0", "/queue/quota.assured2")
+    for(i <- 0 until (block_count+5)) {
+      assert_received("%01024d".format(i))
+    }
+
+  }
+
+  test("Messages delivery assured to a topic once a disconnect receipt is received") {
+
+    //setup a subscription which will block quickly..
+    var consumer = new StompClient
+    connect("1.1", consumer)
+    subscribe("0", "/topic/quota.assured1", "client", headers="credit:1,0\n", c=consumer)
+
+    // figure out at what point a quota'ed consumer stops accepting more messages.
+    connect("1.1")
+    client.socket.setSoTimeout(1*1000)
+    var block_count = 0
+    try {
+      while( true ) {
+        sync_send("/topic/quota.assured1", "%01024d".format(block_count))
+        block_count += 1
+      }
+    } catch{
+      case e:SocketTimeoutException =>
+    }
+    close()
+    close(consumer)
+
+    connect("1.1", consumer)
+    subscribe("0", "/topic/quota.assured2", "client", headers="credit:1,0\n", c=consumer)
+
+    // Send 5 more messages which do not fit in the consumer buffer, they will be
+    // held in the producer connection's delivery session buffer..
+    connect("1.1")
+    for(i <- 0 until (block_count+5)) {
+      async_send("/topic/quota.assured2", "%01024d".format(i))
+    }
+
+    // Even though we disconnect, those 5 that did not fit should still
+    // get delivered once the queue unblocks..
+    disconnect()
+
+    // Lets make sure non of the messages were dropped.
+    for(i <- 0 until (block_count+5)) {
+      assert_received("%01024d".format(i), c=consumer)(true)
+    }
+
+  }
+}
+
 class StompDestinationTest extends StompTestSupport {
 
   test("APLO-206 - Load balance of job queues using small consumer credit windows") {