You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2012/06/22 17:37:33 UTC
svn commit: r1352929 - in /activemq/activemq-apollo/trunk:
apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala
Author: chirino
Date: Fri Jun 22 15:37:32 2012
New Revision: 1352929
URL: http://svn.apache.org/viewvc?rev=1352929&view=rev
Log:
Fixes APLO-210: Sending persistent message to a durable subscription would eventually lockup producers.
Modified:
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala?rev=1352929&r1=1352928&r2=1352929&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala Fri Jun 22 15:37:32 2012
@@ -301,8 +301,6 @@ abstract class DeliveryProducerRoute(rou
if (copy.uow == null) {
copy.uow = store.create_uow
- } else {
- copy.uow.retain
}
if( copy.storeKey == -1L ) {
Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala?rev=1352929&r1=1352928&r2=1352929&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/StompTest.scala Fri Jun 22 15:37:32 2012
@@ -72,6 +72,8 @@ class StompTestSupport extends BrokerFun
c
}
+ def close(c: StompClient = client) = c.close()
+
val receipt_counter = new AtomicLong()
def sync_send(dest:String, body:Any, headers:String="", c:StompClient = client) = {
@@ -1243,6 +1245,37 @@ class DurableSubscriptionOnLevelDBTest e
override def broker_config_uri: String = "xml:classpath:apollo-stomp-leveldb.xml"
+ test("Multiple dsubs contain the same messages (Test case for APLO-210)") {
+
+ val sub_count = 3
+ val message_count = 1000
+
+ // establish 3 durable subs..
+ connect("1.1")
+ for( sub <- 1 to sub_count ) {
+ subscribe(id="sub"+sub, dest="/topic/sometopic", persistent=true)
+ }
+ close()
+
+ connect("1.1")
+
+ val filler = ":"+("x"*(1024*10))
+
+ // Now send a bunch of messages....
+ for( i <- 1 to message_count ) {
+ async_send(dest="/topic/sometopic", headers="persistent:true\n", body=i+filler)
+ }
+
+ // Empty out the durable durable sub
+ for( sub <- 1 to sub_count ) {
+ subscribe(id="sub"+sub, dest="/topic/sometopic", persistent=true, sync=false)
+ for( i <- 1 to message_count ) {
+ assert_received(body=i+filler, sub="sub"+sub)
+ }
+ }
+
+ }
+
test("Can directly send an recieve from a durable sub") {
connect("1.1")
@@ -1420,78 +1453,6 @@ class DurableSubscriptionOnLevelDBTest e
}
- test("Two durable subs contain the same messages") {
- connect("1.1")
-
- // establish 2 durable subs..
- client.write(
- "SUBSCRIBE\n" +
- "destination:/topic/sometopic\n" +
- "id:sub1\n" +
- "persistent:true\n" +
- "receipt:0\n" +
- "\n")
- wait_for_receipt("0")
-
- client.write(
- "SUBSCRIBE\n" +
- "destination:/topic/sometopic\n" +
- "id:sub2\n" +
- "persistent:true\n" +
- "receipt:0\n" +
- "\n")
- wait_for_receipt("0")
-
- client.close
- connect("1.1")
-
- // Now send a bunch of messages....
- def put(id:Int) = {
- client.write(
- "SEND\n" +
- "destination:/topic/sometopic\n" +
- "\n" +
- "message:"+id+"\n")
- }
-
- for( i <- 1 to 1000 ) {
- put(i)
- }
-
- // Now try to get all the previously sent messages.
-
- def get(id:Int) = {
- val frame = client.receive()
- frame should startWith("MESSAGE\n")
- frame should endWith regex("\n\nmessage:"+id+"\n")
- }
-
- // Empty out the first durable sub
- client.write(
- "SUBSCRIBE\n" +
- "destination:/topic/sometopic\n" +
- "id:sub1\n" +
- "persistent:true\n" +
- "\n")
-
- for( i <- 1 to 1000 ) {
- get(i)
- }
-
- // Empty out the 2nd durable sub
- client.write(
- "SUBSCRIBE\n" +
- "destination:/topic/sometopic\n" +
- "id:sub2\n" +
- "persistent:true\n" +
- "\n")
-
- for( i <- 1 to 1000 ) {
- get(i)
- }
-
- }
-
test("Direct send to a non-existant a durable sub fails") {
connect("1.1")