You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2013/03/19 16:55:34 UTC
svn commit: r1458359 - in /activemq/activemq-apollo/trunk:
apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/
apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/
Author: chirino
Date: Tue Mar 19 15:55:34 2013
New Revision: 1458359
URL: http://svn.apache.org/r1458359
Log:
Fixes APLO-310: Wildcard durable sub would match topics created after the durable sub was setup.
Add a wildcard durable sub test case to verify.
Modified:
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala
activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompParallelTest.scala
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala?rev=1458359&r1=1458358&r2=1458359&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala Tue Mar 19 15:55:34 2013
@@ -531,7 +531,7 @@ class LocalRouter(val virtual_host:Virtu
// Disconnect the durable subs
for( dsub <- dest.durable_subscriptions ) {
- dest.unbind_durable_subscription(dsub)
+ dest.unbind(dsub, false)
}
// // Delete any consumer temp queues..
@@ -639,22 +639,13 @@ class LocalRouter(val virtual_host:Virtu
def unbind_topics(queue: Queue, topics: Traversable[_ <: BindAddress]) {
topics.foreach { topic:BindAddress =>
- var matches = local_topic_domain.get_destination_matches(topic.path)
- matches.foreach(_.unbind_durable_subscription(queue))
+ topic_domain.unbind(topic, queue, false, null)
}
}
def bind_topics(queue: Queue, address: SubscriptionAddress, topics: Traversable[_ <: BindAddress]) {
topics.foreach { topic:BindAddress =>
- val wildcard = PathParser.containsWildCards(topic.path)
- var matches = local_topic_domain.get_destination_matches(topic.path)
-
- // We may need to create the topic...
- if (!wildcard && matches.isEmpty) {
- local_topic_domain.create_destination(topic, null)
- matches = local_topic_domain.get_destination_matches(topic.path)
- }
- matches.foreach(_.bind_durable_subscription(address, queue))
+ topic_domain.bind(topic, queue, null, ()=>{})
}
}
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala?rev=1458359&r1=1458358&r2=1458359&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala Tue Mar 19 15:55:34 2013
@@ -441,11 +441,18 @@ class Topic(val router:LocalRouter, val
}
}
- val target = address.domain match {
- case "queue" | "dsub"=>
- // durable sub or mirrored queue case.
+ val target = consumer match {
+
+ // Consumer might be a durable sub.
+ case queue:Queue =>
+ if( !durable_subscriptions.contains(queue) ) {
+ durable_subscriptions += queue
+ }
consumer
- case "topic"=>
+
+ case _ =>
+
+
slow_consumer_policy match {
case "queue" =>
@@ -558,24 +565,18 @@ class Topic(val router:LocalRouter, val
List()
}
}
- for( producer <- producers.keys ) {
- producer.unbind(list)
- }
- check_idle
- }
- def bind_durable_subscription(address: SubscriptionAddress, queue:Queue) = {
- if( !durable_subscriptions.contains(queue) ) {
- durable_subscriptions += queue
- bind(address, queue, ()=>{})
+ // consumer might be a durable sub..
+ consumer match {
+ case queue:Queue =>
+ if( durable_subscriptions.contains(queue) ) {
+ durable_subscriptions -= queue
+ }
+ case _ =>
}
- check_idle
- }
- def unbind_durable_subscription(queue:Queue) = {
- if( durable_subscriptions.contains(queue) ) {
- unbind(queue, false)
- durable_subscriptions -= queue
+ for( producer <- producers.keys ) {
+ producer.unbind(list)
}
check_idle
}
Modified: activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompParallelTest.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompParallelTest.scala?rev=1458359&r1=1458358&r2=1458359&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompParallelTest.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-stomp/src/test/scala/org/apache/activemq/apollo/stomp/test/StompParallelTest.scala Tue Mar 19 15:55:34 2013
@@ -842,6 +842,27 @@ class StompParallelTest extends StompTes
assert_received(3, "my-sub-name")
}
+ test("Topic /w Wildcard durable sub retains messages.") {
+ connect("1.1")
+ val dest = next_id("/topic/dsub_test_")
+ subscribe("my-sub-name", dest+".*", persistent=true)
+ client.close
+
+ // Close him out.. since persistent:true then
+ // the topic subscription will be persistent across client
+ // connections.
+ connect("1.1")
+ async_send(dest+".1", 1)
+ async_send(dest+".2", 2)
+ async_send(dest+".3", 3)
+
+ subscribe("my-sub-name", dest, persistent=true)
+
+ assert_received(1, "my-sub-name")
+ assert_received(2, "my-sub-name")
+ assert_received(3, "my-sub-name")
+ }
+
test("Queue and a selector") {
connect("1.1")