You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2012/06/26 17:23:31 UTC
svn commit: r1354062 - in
/activemq/activemq-apollo/trunk/apollo-stomp/src/test:
resources/apollo-stomp.xml
scala/org/apache/activemq/apollo/stomp/StompTest.scala
Author: chirino
Date: Tue Jun 26 15:23:30 2012
New Revision: 1354062
URL: http://svn.apache.org/viewvc?rev=1354062&view=rev
Log:
Tests for APLO-217 : Seems like messages are getting delivered fine even if client disconnects before messages can be delivered to destinations.
Modified:
activemq/activemq-apollo/trunk/apollo-stomp/src/test/resources/apollo-stomp.xml
activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala
Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/test/resources/apollo-stomp.xml
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/test/resources/apollo-stomp.xml?rev=1354062&r1=1354061&r2=1354062&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/test/resources/apollo-stomp.xml (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/test/resources/apollo-stomp.xml Tue Jun 26 15:23:30 2012
@@ -21,6 +21,11 @@
<virtual_host id="default">
<host_name>localhost</host_name>
+ <queue id="quota.**" quota="10k"/>
+ <topic id="quota.**" slow_consumer_policy="queue">
+ <subscription quota="10k"/>
+ </topic>
+
<queue id="nacker.**" dlq="dlq.*" nak_limit="2"/>
<queue id="mirrored.**" mirrored="true"/>
<topic id="queued.**" slow_consumer_policy="queue">
Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala?rev=1354062&r1=1354061&r2=1354062&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala Tue Jun 26 15:23:30 2012
@@ -23,11 +23,11 @@ import java.util.concurrent.TimeUnit._
import org.apache.activemq.apollo.util._
import java.util.concurrent.atomic.AtomicLong
import FileSupport._
-import java.net.InetSocketAddress
import java.nio.channels.DatagramChannel
import org.fusesource.hawtbuf.AsciiBuffer
import org.apache.activemq.apollo.broker._
import org.apache.activemq.apollo.dto.{TopicStatusDTO, KeyStorageDTO}
+import java.net.{SocketTimeoutException, InetSocketAddress}
class StompTestSupport extends BrokerFunSuiteSupport with ShouldMatchers with BeforeAndAfterEach {
@@ -599,6 +599,92 @@ class StompPersistentQueueTest extends S
}
+/**
+ * These disconnect tests assure that we don't drop message deliviers that are in flight
+ * if a client disconnects before those deliveries are accepted by the target destination.
+ */
+class StompDisconnectTest extends StompTestSupport {
+
+ test("Messages delivery assured to a queued once a disconnect receipt is received") {
+
+ // figure out at what point a quota'ed queue stops accepting more messages.
+ connect("1.1")
+ client.socket.setSoTimeout(1*1000)
+ var block_count = 0
+ try {
+ while( true ) {
+ sync_send("/queue/quota.assured1", "%01024d".format(block_count))
+ block_count += 1
+ }
+ } catch{
+ case e:SocketTimeoutException =>
+ }
+ close()
+
+ // Send 5 more messages which do not fit in the queue, they will be
+ // held in the producer connection's delivery session buffer..
+ connect("1.1")
+ for(i <- 0 until (block_count+5)) {
+ async_send("/queue/quota.assured2", "%01024d".format(i))
+ }
+
+ // Even though we disconnect, those 5 that did not fit should still
+ // get delivered once the queue unblocks..
+ disconnect()
+
+ // Lets make sure non of the messages were dropped.
+ connect("1.1")
+ subscribe("0", "/queue/quota.assured2")
+ for(i <- 0 until (block_count+5)) {
+ assert_received("%01024d".format(i))
+ }
+
+ }
+
+ test("Messages delivery assured to a topic once a disconnect receipt is received") {
+
+ //setup a subscription which will block quickly..
+ var consumer = new StompClient
+ connect("1.1", consumer)
+ subscribe("0", "/topic/quota.assured1", "client", headers="credit:1,0\n", c=consumer)
+
+ // figure out at what point a quota'ed consumer stops accepting more messages.
+ connect("1.1")
+ client.socket.setSoTimeout(1*1000)
+ var block_count = 0
+ try {
+ while( true ) {
+ sync_send("/topic/quota.assured1", "%01024d".format(block_count))
+ block_count += 1
+ }
+ } catch{
+ case e:SocketTimeoutException =>
+ }
+ close()
+ close(consumer)
+
+ connect("1.1", consumer)
+ subscribe("0", "/topic/quota.assured2", "client", headers="credit:1,0\n", c=consumer)
+
+ // Send 5 more messages which do not fit in the consumer buffer, they will be
+ // held in the producer connection's delivery session buffer..
+ connect("1.1")
+ for(i <- 0 until (block_count+5)) {
+ async_send("/topic/quota.assured2", "%01024d".format(i))
+ }
+
+ // Even though we disconnect, those 5 that did not fit should still
+ // get delivered once the queue unblocks..
+ disconnect()
+
+ // Lets make sure non of the messages were dropped.
+ for(i <- 0 until (block_count+5)) {
+ assert_received("%01024d".format(i), c=consumer)(true)
+ }
+
+ }
+}
+
class StompDestinationTest extends StompTestSupport {
test("APLO-206 - Load balance of job queues using small consumer credit windows") {