You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2012/06/22 17:37:33 UTC

svn commit: r1352929 - in /activemq/activemq-apollo/trunk: apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala

Author: chirino
Date: Fri Jun 22 15:37:32 2012
New Revision: 1352929

URL: http://svn.apache.org/viewvc?rev=1352929&view=rev
Log:
Fixes APLO-210: Sending persistent message to a durable subscription would eventually lockup producers.

Modified:
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
    activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala?rev=1352929&r1=1352928&r2=1352929&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala Fri Jun 22 15:37:32 2012
@@ -301,8 +301,6 @@ abstract class DeliveryProducerRoute(rou
 
             if (copy.uow == null) {
               copy.uow = store.create_uow
-            } else {
-              copy.uow.retain
             }
 
             if( copy.storeKey == -1L ) {

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala?rev=1352929&r1=1352928&r2=1352929&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala Fri Jun 22 15:37:32 2012
@@ -72,6 +72,8 @@ class StompTestSupport extends BrokerFun
     c
   }
 
+  def close(c: StompClient = client) = c.close()
+
   val receipt_counter = new AtomicLong()
 
   def sync_send(dest:String, body:Any, headers:String="", c:StompClient = client) = {
@@ -1243,6 +1245,37 @@ class DurableSubscriptionOnLevelDBTest e
 
   override def broker_config_uri: String = "xml:classpath:apollo-stomp-leveldb.xml"
 
+  test("Multiple dsubs contain the same messages (Test case for APLO-210)") {
+
+    val sub_count = 3
+    val message_count = 1000
+
+    // establish 3 durable subs..
+    connect("1.1")
+    for( sub <- 1 to sub_count ) {
+      subscribe(id="sub"+sub, dest="/topic/sometopic", persistent=true)
+    }
+    close()
+
+    connect("1.1")
+
+    val filler = ":"+("x"*(1024*10))
+
+    // Now send a bunch of messages....
+    for( i <- 1 to message_count ) {
+      async_send(dest="/topic/sometopic", headers="persistent:true\n", body=i+filler)
+    }
+
+    // Empty out the durable durable sub
+    for( sub <- 1 to sub_count ) {
+      subscribe(id="sub"+sub, dest="/topic/sometopic", persistent=true, sync=false)
+      for( i <- 1 to message_count ) {
+        assert_received(body=i+filler, sub="sub"+sub)
+      }
+    }
+
+  }
+
   test("Can directly send an recieve from a durable sub") {
     connect("1.1")
 
@@ -1420,78 +1453,6 @@ class DurableSubscriptionOnLevelDBTest e
 
   }
 
-  test("Two durable subs contain the same messages") {
-    connect("1.1")
-
-    // establish 2 durable subs..
-    client.write(
-      "SUBSCRIBE\n" +
-      "destination:/topic/sometopic\n" +
-      "id:sub1\n" +
-      "persistent:true\n" +
-      "receipt:0\n" +
-      "\n")
-    wait_for_receipt("0")
-
-    client.write(
-      "SUBSCRIBE\n" +
-      "destination:/topic/sometopic\n" +
-      "id:sub2\n" +
-      "persistent:true\n" +
-      "receipt:0\n" +
-      "\n")
-    wait_for_receipt("0")
-
-    client.close
-    connect("1.1")
-
-    // Now send a bunch of messages....
-    def put(id:Int) = {
-      client.write(
-        "SEND\n" +
-        "destination:/topic/sometopic\n" +
-        "\n" +
-        "message:"+id+"\n")
-    }
-
-    for( i <- 1 to 1000 ) {
-      put(i)
-    }
-
-    // Now try to get all the previously sent messages.
-
-    def get(id:Int) = {
-      val frame = client.receive()
-      frame should startWith("MESSAGE\n")
-      frame should endWith regex("\n\nmessage:"+id+"\n")
-    }
-
-    // Empty out the first durable sub
-    client.write(
-      "SUBSCRIBE\n" +
-      "destination:/topic/sometopic\n" +
-      "id:sub1\n" +
-      "persistent:true\n" +
-      "\n")
-
-    for( i <- 1 to 1000 ) {
-      get(i)
-    }
-
-    // Empty out the 2nd durable sub
-    client.write(
-      "SUBSCRIBE\n" +
-      "destination:/topic/sometopic\n" +
-      "id:sub2\n" +
-      "persistent:true\n" +
-      "\n")
-
-    for( i <- 1 to 1000 ) {
-      get(i)
-    }
-
-  }
-
   test("Direct send to a non-existant a durable sub fails") {
     connect("1.1")