You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2013/03/19 16:55:34 UTC

svn commit: r1458359 - in /activemq/activemq-apollo/trunk: apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/ apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/

Author: chirino
Date: Tue Mar 19 15:55:34 2013
New Revision: 1458359

URL: http://svn.apache.org/r1458359
Log:
Fixes APLO-310: Wildcard durable sub would match topics created after the durable sub was setup.

Add a wildcard durable sub test case to verify.

Modified:
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala
    activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompParallelTest.scala

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala?rev=1458359&r1=1458358&r2=1458359&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala Tue Mar 19 15:55:34 2013
@@ -531,7 +531,7 @@ class LocalRouter(val virtual_host:Virtu
 
         // Disconnect the durable subs
         for( dsub <- dest.durable_subscriptions ) {
-          dest.unbind_durable_subscription(dsub)
+          dest.unbind(dsub, false)
         }
 
 //        // Delete any consumer temp queues..
@@ -639,22 +639,13 @@ class LocalRouter(val virtual_host:Virtu
 
     def unbind_topics(queue: Queue, topics: Traversable[_ <: BindAddress]) {
       topics.foreach { topic:BindAddress =>
-        var matches = local_topic_domain.get_destination_matches(topic.path)
-        matches.foreach(_.unbind_durable_subscription(queue))
+        topic_domain.unbind(topic, queue, false, null)
       }
     }
 
     def bind_topics(queue: Queue, address: SubscriptionAddress, topics: Traversable[_ <: BindAddress]) {
       topics.foreach { topic:BindAddress =>
-        val wildcard = PathParser.containsWildCards(topic.path)
-        var matches = local_topic_domain.get_destination_matches(topic.path)
-
-        // We may need to create the topic...
-        if (!wildcard && matches.isEmpty) {
-          local_topic_domain.create_destination(topic, null)
-          matches = local_topic_domain.get_destination_matches(topic.path)
-        }
-        matches.foreach(_.bind_durable_subscription(address, queue))
+        topic_domain.bind(topic, queue, null, ()=>{})
       }
     }
 

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala?rev=1458359&r1=1458358&r2=1458359&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala Tue Mar 19 15:55:34 2013
@@ -441,11 +441,18 @@ class Topic(val router:LocalRouter, val 
       }
     }
 
-    val target = address.domain match {
-      case "queue" | "dsub"=>
-        // durable sub or mirrored queue case.
+    val target = consumer match {
+
+      // Consumer might be a durable sub.
+      case queue:Queue =>
+        if( !durable_subscriptions.contains(queue) ) {
+          durable_subscriptions += queue
+        }
         consumer
-      case "topic"=>
+
+      case _ =>
+
+
         slow_consumer_policy match {
           case "queue" =>
 
@@ -558,24 +565,18 @@ class Topic(val router:LocalRouter, val 
             List()
         }
     }
-    for( producer <- producers.keys ) {
-     producer.unbind(list)
-    }
-    check_idle
-  }
 
-  def bind_durable_subscription(address: SubscriptionAddress, queue:Queue)  = {
-    if( !durable_subscriptions.contains(queue) ) {
-      durable_subscriptions += queue
-      bind(address, queue, ()=>{})
+    // consumer might be a durable sub..
+    consumer match {
+      case queue:Queue =>
+        if( durable_subscriptions.contains(queue) ) {
+          durable_subscriptions -= queue
+        }
+      case _ =>
     }
-    check_idle
-  }
 
-  def unbind_durable_subscription(queue:Queue)  = {
-    if( durable_subscriptions.contains(queue) ) {
-      unbind(queue, false)
-      durable_subscriptions -= queue
+    for( producer <- producers.keys ) {
+     producer.unbind(list)
     }
     check_idle
   }

Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompParallelTest.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompParallelTest.scala?rev=1458359&r1=1458358&r2=1458359&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompParallelTest.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompParallelTest.scala Tue Mar 19 15:55:34 2013
@@ -842,6 +842,27 @@ class StompParallelTest extends StompTes
     assert_received(3, "my-sub-name")
   }
 
+  test("Topic /w Wildcard durable sub retains messages.") {
+    connect("1.1")
+    val dest = next_id("/topic/dsub_test_")
+    subscribe("my-sub-name", dest+".*", persistent=true)
+    client.close
+
+    // Close him out.. since persistent:true then
+    // the topic subscription will be persistent across client
+    // connections.
+    connect("1.1")
+    async_send(dest+".1", 1)
+    async_send(dest+".2", 2)
+    async_send(dest+".3", 3)
+
+    subscribe("my-sub-name", dest, persistent=true)
+
+    assert_received(1, "my-sub-name")
+    assert_received(2, "my-sub-name")
+    assert_received(3, "my-sub-name")
+  }
+
   test("Queue and a selector") {
     connect("1.1")