You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2012/07/22 00:17:52 UTC
svn commit: r1364182 -
/activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala
Author: chirino
Date: Sat Jul 21 22:17:51 2012
New Revision: 1364182
URL: http://svn.apache.org/viewvc?rev=1364182&view=rev
Log:
Test case for APLO-218 : Unexpected "Transport listener failure" when you send a TXed ack after unsubscribing
Patch contributed by Christian Posta with slight modifications.
Modified:
activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala
Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala?rev=1364182&r1=1364181&r2=1364182&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala Sat Jul 21 22:17:51 2012
@@ -125,7 +125,7 @@ class StompTestSupport extends BrokerFun
def unsubscribe(id:String, headers:String="", c: StompClient=client) = {
val rid = receipt_counter.incrementAndGet()
- client.write(
+ c.write(
"UNSUBSCRIBE\n" +
"id:"+id+"\n" +
"receipt:"+rid+"\n" +
@@ -134,7 +134,7 @@ class StompTestSupport extends BrokerFun
wait_for_receipt(""+rid, c)
}
- def assert_received(body:Any, sub:String=null, c: StompClient=client):(Boolean)=>Unit = {
+ def assert_received(body:Any, sub:String=null, c: StompClient=client, txid:String = null):(Boolean)=>Unit = {
val frame = c.receive()
frame should startWith("MESSAGE\n")
if(sub!=null) {
@@ -155,7 +155,9 @@ class StompTestSupport extends BrokerFun
(if(ack) "ACK\n" else "NACK\n") +
"subscription:"+sub+"\n" +
"message-id:"+msgid+"\n" +
- "\n")
+ (if(txid != null) "transaction:"+txid+"\n" else "") +
+
+ "\n")
}
}
@@ -1745,6 +1747,41 @@ class StompReceiptTest extends StompTest
}
class StompTransactionTest extends StompTestSupport {
+ test("Transacted commit after unsubscribe"){
+ val producer = new StompClient
+ val consumer = new StompClient
+
+ connect("1.1", producer)
+ connect("1.1", consumer)
+
+ // subscribe the consumer
+ subscribe("0", "/queue/test", "client-individual", false, "", true, consumer)
+
+ // begin the transaction on the consumer
+ consumer.write(
+ "BEGIN\n" +
+ "transaction:x\n" +
+ "\n")
+
+ sync_send("/queue/test", "Hello world", "", producer)
+
+ val ack = assert_received("Hello world", "0", consumer, "x")
+ ack(true)
+
+ unsubscribe("0", "", consumer)
+
+ consumer.write(
+ "COMMIT\n" +
+ "transaction:x\n" +
+ "\n")
+
+ sync_send("/queue/test", "END", "", producer)
+ subscribe("1", "/queue/test", c=producer)
+ assert_received("END", "1", producer)
+ // since we committed the transaction AFTER un-subscribing, there should be nothing in
+ // the queue
+
+ }
test("Queue and a transacted send") {
connect("1.1")
@@ -1786,7 +1823,7 @@ class StompTransactionTest extends Stomp
"\n")
get(2)
-
+
}
test("Topic and a transacted send") {