You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2013/02/20 15:18:55 UTC

svn commit: r1448179 - in /activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test: StompParallelTest.scala StompTestSupport.scala

Author: chirino
Date: Wed Feb 20 14:18:55 2013
New Revision: 1448179

URL: http://svn.apache.org/r1448179
Log:
Add more stomp transaction tests.

Modified:
    activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompParallelTest.scala
    activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompTestSupport.scala

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompParallelTest.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompParallelTest.scala?rev=1448179&r1=1448178&r2=1448179&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompParallelTest.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompParallelTest.scala Wed Feb 20 14:18:55 2013
@@ -1619,21 +1619,12 @@ class StompParallelTest extends StompTes
       connect("1.1")
       async_send(dest, "m1")
 
-      client.write(
-        "BEGIN\n" +
-        "transaction:x\n" +
-        "\n")
-
-      async_send(dest, "t1", "transaction:x\n")
+      val tx = begin()
+      async_send(dest, "t1", "transaction:"+tx+"\n")
       async_send(dest, "m2")
-      async_send(dest, "t2", "transaction:x\n")
+      async_send(dest, "t2", "transaction:"+tx+"\n")
+      commit(tx)
 
-      client.write(
-        "COMMIT\n" +
-        "transaction:x\n" +
-        "receipt:0\n"+
-        "\n")
-      wait_for_receipt("0")
       async_send(dest, "m3")
 
       assert_received("m1",c=receiver)
@@ -1645,6 +1636,62 @@ class StompParallelTest extends StompTes
   }
 
   for( kind <- Array("/queue/", "/topic/", "/topic/queued.")) {
+    test("Transaction commit acks on "+kind) {
+
+      val dest = next_id(kind+"tx-commit-acks-")
+
+      val receiver = connect("1.1", new StompClient)
+      subscribe("mysub",dest,mode="client",c=receiver)
+
+      connect("1.1")
+
+      async_send(dest, "m1")
+      async_send(dest, "m2")
+      async_send(dest, "m3")
+      async_send(dest, "m4")
+
+      val tx = begin(c=receiver)
+      assert_received("m1",c=receiver)(true)
+      assert_received("m2",c=receiver, txid=tx)(true)
+      assert_received("m3",c=receiver)(true)
+      assert_received("m4",c=receiver, txid=tx)(true)
+      commit(tx, c=receiver)
+
+      async_send(dest, "m5")
+      assert_received("m5",c=receiver)
+    }
+  }
+
+  for( kind <- Array("/queue/", "/topic/", "/topic/queued.")) {
+    test("Transaction abort acks on "+kind) {
+
+      val dest = next_id(kind+"tx-abort-acks-")
+
+      val receiver = connect("1.1", new StompClient)
+      subscribe("mysub",dest,mode="client",c=receiver)
+
+      connect("1.1")
+
+      async_send(dest, "m1")
+      async_send(dest, "m2")
+      async_send(dest, "m3")
+      async_send(dest, "m4")
+
+      val tx = begin(c=receiver)
+      assert_received("m1",c=receiver)(true)
+      assert_received("m2",c=receiver, txid=tx)(true)
+      assert_received("m3",c=receiver)(true)
+      assert_received("m4",c=receiver, txid=tx)(true)
+      abort(tx, c=receiver)
+
+      // aborting a tx does not cause a redelivery to occur.
+      async_send(dest, "m5")
+      assert_received("m5",c=receiver)
+    }
+  }
+
+
+  for( kind <- Array("/queue/", "/topic/", "/topic/queued.")) {
     test("Sending already expired message to "+kind) {
 
       val dest = next_id(kind+"expired-")
@@ -1663,4 +1710,5 @@ class StompParallelTest extends StompTes
       assert_received("m2",c=receiver)
     }
   }
+
 }

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompTestSupport.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompTestSupport.scala?rev=1448179&r1=1448178&r2=1448179&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompTestSupport.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompTestSupport.scala Wed Feb 20 14:18:55 2013
@@ -104,6 +104,38 @@ class StompTestSupport extends BrokerFun
               body)
   }
 
+  def begin(txid: String="x", c: StompClient = client) = {
+    c.write(
+      "BEGIN\n" +
+      "transaction:"+txid+"\n" +
+      "\n")
+    txid
+  }
+
+  def abort(txid: String="x", sync:Boolean=true, c: StompClient = client) = {
+    val rid = receipt_counter.incrementAndGet()
+    c.write(
+      "ABORT\n" +
+      "transaction:"+txid+"\n" +
+      (if (sync) "receipt:" + rid + "\n" else "") +
+      "\n")
+    if (sync) {
+      wait_for_receipt("" + rid, c)
+    }
+  }
+
+  def commit(txid: String="x", sync:Boolean=true, c: StompClient = client) = {
+    val rid = receipt_counter.incrementAndGet()
+    c.write(
+      "COMMIT\n" +
+      "transaction:"+txid+"\n" +
+      (if (sync) "receipt:" + rid + "\n" else "") +
+      "\n")
+    if (sync) {
+      wait_for_receipt("" + rid, c)
+    }
+  }
+
   def subscribe(id: String, dest: String, mode: String = "auto", persistent: Boolean = false, headers: String = "", sync: Boolean = true, c: StompClient = client) = {
     val rid = receipt_counter.incrementAndGet()
     c.write(