You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2013/02/20 15:18:55 UTC
svn commit: r1448179 - in
/activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test:
StompParallelTest.scala StompTestSupport.scala
Author: chirino
Date: Wed Feb 20 14:18:55 2013
New Revision: 1448179
URL: http://svn.apache.org/r1448179
Log:
Add more stomp transaction tests.
Modified:
activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompParallelTest.scala
activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompTestSupport.scala
Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompParallelTest.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompParallelTest.scala?rev=1448179&r1=1448178&r2=1448179&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompParallelTest.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompParallelTest.scala Wed Feb 20 14:18:55 2013
@@ -1619,21 +1619,12 @@ class StompParallelTest extends StompTes
connect("1.1")
async_send(dest, "m1")
- client.write(
- "BEGIN\n" +
- "transaction:x\n" +
- "\n")
-
- async_send(dest, "t1", "transaction:x\n")
+ val tx = begin()
+ async_send(dest, "t1", "transaction:"+tx+"\n")
async_send(dest, "m2")
- async_send(dest, "t2", "transaction:x\n")
+ async_send(dest, "t2", "transaction:"+tx+"\n")
+ commit(tx)
- client.write(
- "COMMIT\n" +
- "transaction:x\n" +
- "receipt:0\n"+
- "\n")
- wait_for_receipt("0")
async_send(dest, "m3")
assert_received("m1",c=receiver)
@@ -1645,6 +1636,62 @@ class StompParallelTest extends StompTes
}
for( kind <- Array("/queue/", "/topic/", "/topic/queued.")) {
+ test("Transaction commit acks on "+kind) {
+
+ val dest = next_id(kind+"tx-commit-acks-")
+
+ val receiver = connect("1.1", new StompClient)
+ subscribe("mysub",dest,mode="client",c=receiver)
+
+ connect("1.1")
+
+ async_send(dest, "m1")
+ async_send(dest, "m2")
+ async_send(dest, "m3")
+ async_send(dest, "m4")
+
+ val tx = begin(c=receiver)
+ assert_received("m1",c=receiver)(true)
+ assert_received("m2",c=receiver, txid=tx)(true)
+ assert_received("m3",c=receiver)(true)
+ assert_received("m4",c=receiver, txid=tx)(true)
+ commit(tx, c=receiver)
+
+ async_send(dest, "m5")
+ assert_received("m5",c=receiver)
+ }
+ }
+
+ for( kind <- Array("/queue/", "/topic/", "/topic/queued.")) {
+ test("Transaction abort acks on "+kind) {
+
+ val dest = next_id(kind+"tx-abort-acks-")
+
+ val receiver = connect("1.1", new StompClient)
+ subscribe("mysub",dest,mode="client",c=receiver)
+
+ connect("1.1")
+
+ async_send(dest, "m1")
+ async_send(dest, "m2")
+ async_send(dest, "m3")
+ async_send(dest, "m4")
+
+ val tx = begin(c=receiver)
+ assert_received("m1",c=receiver)(true)
+ assert_received("m2",c=receiver, txid=tx)(true)
+ assert_received("m3",c=receiver)(true)
+ assert_received("m4",c=receiver, txid=tx)(true)
+ abort(tx, c=receiver)
+
+ // aborting a tx does not cause a redelivery to occur.
+ async_send(dest, "m5")
+ assert_received("m5",c=receiver)
+ }
+ }
+
+
+ for( kind <- Array("/queue/", "/topic/", "/topic/queued.")) {
test("Sending already expired message to "+kind) {
val dest = next_id(kind+"expired-")
@@ -1663,4 +1710,5 @@ class StompParallelTest extends StompTes
assert_received("m2",c=receiver)
}
}
+
}
Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompTestSupport.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompTestSupport.scala?rev=1448179&r1=1448178&r2=1448179&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompTestSupport.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompTestSupport.scala Wed Feb 20 14:18:55 2013
@@ -104,6 +104,38 @@ class StompTestSupport extends BrokerFun
body)
}
+ def begin(txid: String="x", c: StompClient = client) = {
+ c.write(
+ "BEGIN\n" +
+ "transaction:"+txid+"\n" +
+ "\n")
+ txid
+ }
+
+ def abort(txid: String="x", sync:Boolean=true, c: StompClient = client) = {
+ val rid = receipt_counter.incrementAndGet()
+ c.write(
+ "ABORT\n" +
+ "transaction:"+txid+"\n" +
+ (if (sync) "receipt:" + rid + "\n" else "") +
+ "\n")
+ if (sync) {
+ wait_for_receipt("" + rid, c)
+ }
+ }
+
+ def commit(txid: String="x", sync:Boolean=true, c: StompClient = client) = {
+ val rid = receipt_counter.incrementAndGet()
+ c.write(
+ "COMMIT\n" +
+ "transaction:"+txid+"\n" +
+ (if (sync) "receipt:" + rid + "\n" else "") +
+ "\n")
+ if (sync) {
+ wait_for_receipt("" + rid, c)
+ }
+ }
+
def subscribe(id: String, dest: String, mode: String = "auto", persistent: Boolean = false, headers: String = "", sync: Boolean = true, c: StompClient = client) = {
val rid = receipt_counter.incrementAndGet()
c.write(