You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2011/06/11 23:25:50 UTC
svn commit: r1134769 - in /activemq/activemq-apollo/trunk:
apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/
apollo-dto/src/main/java/org/apache/activemq/apollo/dto/
Author: chirino
Date: Sat Jun 11 21:25:50 2011
New Revision: 1134769
URL: http://svn.apache.org/viewvc?rev=1134769&view=rev
Log:
Fixes https://issues.apache.org/jira/browse/APLO-45 : Support auto deleting idle queues and topics
Modified:
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueBinding.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala
activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueDTO.java
activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/TopicDTO.java
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala?rev=1134769&r1=1134768&r2=1134769&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala Sat Jun 11 21:25:50 2011
@@ -66,6 +66,16 @@ object LocalRouter extends Log {
val QUEUE_KIND = "queue"
val DEFAULT_QUEUE_PATH = "default"
+ def is_wildcard_config(dto:StringIdDTO) = {
+ if( dto.id == null ) {
+ true
+ } else {
+ val parts = destination_parser.parts(dto.id)
+ val path = destination_parser.decode_path(parts)
+ PathParser.containsWildCards(path)
+ }
+ }
+
class ConsumerContext(val destination:DestinationDTO, val consumer:DeliveryConsumer, val security:SecurityContext) {
override def hashCode: Int = consumer.hashCode
@@ -314,7 +324,9 @@ class LocalRouter(val virtual_host:Virtu
def topic_config(name:Path):TopicDTO = {
import collection.JavaConversions._
import destination_parser._
- virtual_host.config.topics.find( x=> decode_filter(x.id).matches(name) ).getOrElse(new TopicDTO)
+ virtual_host.config.topics.find{ x=>
+ x.id==null || decode_filter(x.id).matches(name)
+ }.getOrElse(new TopicDTO)
}
override def connect(path:Path, destination:DestinationDTO, producer:BindableDeliveryProducer, security:SecurityContext):Unit = {
@@ -351,7 +363,7 @@ class LocalRouter(val virtual_host:Virtu
return new Failure("Not authorized to create the destination")
}
- val topic = new Topic(LocalRouter.this, destination.asInstanceOf[TopicDestinationDTO], ()=>topic_config(path), path.toString(destination_parser))
+ val topic = new Topic(LocalRouter.this, destination.asInstanceOf[TopicDestinationDTO], ()=>topic_config(path), path.toString(destination_parser), path)
add_destination(path, topic)
Success(topic)
}
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=1134769&r1=1134768&r2=1134769&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala Sat Jun 11 21:25:50 2011
@@ -136,8 +136,22 @@ class Queue(val router: LocalRouter, val
record.binding_kind = binding.binding_kind
virtual_host.store.add_queue(record) { rc => Unit }
}
+
+ auto_delete_after = config.auto_delete_after.getOrElse(60*5)
+ if( auto_delete_after!= 0 ) {
+ // we don't auto delete explicitly configured queues,
+ // non destination queues, or unified queues.
+ if( config.unified.getOrElse(false) || !binding.isInstanceOf[QueueDomainQueueBinding] || !LocalRouter.is_wildcard_config(config) ) {
+ auto_delete_after = 0
+ }
+ }
+
+ println("auto_delete_after: "+this+": "+auto_delete_after)
+
+ }
+ dispatch_queue {
+ configure(config)
}
- configure(config)
var last_maintenance_ts = System.currentTimeMillis
@@ -178,6 +192,9 @@ class Queue(val router: LocalRouter, val
var restored_from_store = false
+ var auto_delete_after = 0
+ var idled_at = 0L
+
def update(on_completed:Runnable) = dispatch_queue {
val prev_persistent = tune_persistent
@@ -201,11 +218,30 @@ class Queue(val router: LocalRouter, val
swapped_in_size_max += (tune_queue_buffer-prev_queue_buffer)
restore_from_store {
+ check_idle
trigger_swap
on_completed.run
}
}
+ def check_idle {
+ println("check_idle auto_delete_after: "+this+": "+auto_delete_after)
+ if (producers.isEmpty && all_subscriptions.isEmpty && queue_items==0 ) {
+ if (idled_at==0) {
+ val now = System.currentTimeMillis()
+ idled_at = now
+ if( auto_delete_after!=0 ) {
+ dispatch_queue.after(auto_delete_after, TimeUnit.SECONDS) {
+ if( now == idled_at ) {
+ router._destroy_queue(this)
+ }
+ }
+ }
+ }
+ } else {
+ idled_at = 0
+ }
+ }
def restore_from_store(on_completed: => Unit) {
if (!restored_from_store && tune_persistent) {
@@ -236,11 +272,13 @@ class Queue(val router: LocalRouter, val
}
protected def _start(on_completed: Runnable) = {
+ println("_start auto_delete_after: "+this+": "+auto_delete_after)
swapped_in_size_max += tune_queue_buffer;
restore_from_store {
+
// by the time this is run, consumers and producers may have already joined.
on_completed.run
schedule_periodic_maintenance
@@ -250,6 +288,7 @@ class Queue(val router: LocalRouter, val
}
// kick off dispatching to the consumers.
+ check_idle
trigger_swap
dispatch_queue << head_entry
@@ -504,6 +543,7 @@ class Queue(val router: LocalRouter, val
def connect(p: DeliveryProducer) = new DeliverySession {
retain
+
override def toString = Queue.this.toString
override def consumer = Queue.this
@@ -615,6 +655,7 @@ class Queue(val router: LocalRouter, val
} else {
dispatch_queue {
producers += producer
+ check_idle
}
producer.bind(this::Nil)
}
@@ -628,6 +669,7 @@ class Queue(val router: LocalRouter, val
} else {
dispatch_queue {
producers -= producer
+ check_idle
}
producer.unbind(this::Nil)
}
@@ -1467,6 +1509,7 @@ class Subscription(val queue:Queue, val
refill_prefetch
queue.dispatch_queue << queue.head_entry
}
+ queue.check_idle
}
def close() = {
@@ -1497,6 +1540,7 @@ class Subscription(val queue:Queue, val
session = null
consumer.release
+ queue.check_idle
queue.trigger_swap
} else {}
}
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueBinding.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueBinding.scala?rev=1134769&r1=1134768&r2=1134769&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueBinding.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueBinding.scala Sat Jun 11 21:25:50 2011
@@ -123,10 +123,7 @@ object QueueDomainQueueBinding extends Q
import LocalRouter.destination_parser._
def matches(x:QueueDTO):Boolean = {
- if( x.id != null && !decode_filter(x.id).matches(path)) {
- return false
- }
- true
+ x.id==null || decode_filter(x.id).matches(path)
}
virtual_host.config.queues.find(matches _).getOrElse(new QueueDTO)
}
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala?rev=1134769&r1=1134768&r2=1134769&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala Sat Jun 11 21:25:50 2011
@@ -17,12 +17,12 @@
package org.apache.activemq.apollo.broker
import org.apache.activemq.apollo.util._
-import path.PathParser._
+import path.Path
import scala.collection.immutable.List
-import org.apache.activemq.apollo.util.path.Path
import org.apache.activemq.apollo.dto._
-import security.SecurityContext
import collection.mutable.{HashMap, ListBuffer}
+import java.util.concurrent.TimeUnit
+import org.fusesource.hawtdispatch._
/**
* <p>
@@ -31,14 +31,19 @@ import collection.mutable.{HashMap, List
*
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
-class Topic(val router:LocalRouter, val destination_dto:TopicDestinationDTO, var config_updater: ()=>TopicDTO, val id:String) extends DomainDestination {
+class Topic(val router:LocalRouter, val destination_dto:TopicDestinationDTO, var config_updater: ()=>TopicDTO, val id:String, path:Path) extends DomainDestination {
var producers = ListBuffer[BindableDeliveryProducer]()
var consumers = ListBuffer[DeliveryConsumer]()
var durable_subscriptions = ListBuffer[Queue]()
var consumer_queues = HashMap[DeliveryConsumer, Queue]()
+ var idled_at = 0L
val created_at = System.currentTimeMillis()
- var config = config_updater()
+ var auto_delete_after = 0
+
+ var config:TopicDTO = _
+
+ refresh_config
import OptionSupport._
@@ -47,10 +52,42 @@ class Topic(val router:LocalRouter, val
def slow_consumer_policy = config.slow_consumer_policy.getOrElse("block")
def update(on_completed:Runnable) = {
- config = config_updater()
+ refresh_config
on_completed.run
}
+ def refresh_config = {
+ import OptionSupport._
+
+ config = config_updater()
+ auto_delete_after = config.auto_delete_after.getOrElse(60*5)
+ if( auto_delete_after!= 0 ) {
+ // we don't auto delete explicitly configured destinations.
+ if( !LocalRouter.is_wildcard_config(config) ) {
+ auto_delete_after = 0
+ }
+ }
+ check_idle
+ }
+
+ def check_idle {
+ if (producers.isEmpty && consumers.isEmpty && durable_subscriptions.isEmpty) {
+ if (idled_at==0) {
+ val now = System.currentTimeMillis()
+ idled_at = now
+ if( auto_delete_after!=0 ) {
+ virtual_host.dispatch_queue.after(auto_delete_after, TimeUnit.SECONDS) {
+ if( now == idled_at ) {
+ router.topic_domain.remove_destination(path, this)
+ }
+ }
+ }
+ }
+ } else {
+ idled_at = 0
+ }
+ }
+
def bind (destination: DestinationDTO, consumer:DeliveryConsumer) = {
destination match {
case null=> // unified queue case
@@ -85,6 +122,7 @@ class Topic(val router:LocalRouter, val
})
}
+ check_idle
}
def unbind (consumer:DeliveryConsumer, persistent:Boolean) = {
@@ -117,6 +155,7 @@ class Topic(val router:LocalRouter, val
})
}
}
+ check_idle
}
@@ -133,6 +172,7 @@ class Topic(val router:LocalRouter, val
}
}
}
+ check_idle
}
def unbind_durable_subscription(destination: DurableSubscriptionDestinationDTO, queue:Queue) = {
@@ -148,16 +188,19 @@ class Topic(val router:LocalRouter, val
}
}
}
+ check_idle
}
def connect (destination:DestinationDTO, producer:BindableDeliveryProducer) = {
producers += producer
producer.bind(consumers.toList ::: durable_subscriptions.toList)
+ check_idle
}
def disconnect (producer:BindableDeliveryProducer) = {
producers = producers.filterNot( _ == producer )
producer.unbind(consumers.toList ::: durable_subscriptions.toList)
+ check_idle
}
}
Modified: activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueDTO.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueDTO.java?rev=1134769&r1=1134768&r2=1134769&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueDTO.java (original)
+++ activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueDTO.java Sat Jun 11 21:25:50 2011
@@ -28,6 +28,17 @@ import javax.xml.bind.annotation.*;
public class QueueDTO extends StringIdDTO {
/**
+ * Controls when the queue will auto delete.
+ * If set to zero, then the queue will NOT auto
+ * delete, otherwise the queue will auto delete
+ * after it has been unused for the number
+ * of seconds configured in this field. If unset,
+ * it defaults to 5 minutes.
+ */
+ @XmlAttribute(name="auto_delete_after")
+ public Integer auto_delete_after;
+
+ /**
* If set to true, then routing then there is no difference between
* sending to a queue or topic of the same name. The first time
* a queue is created, it will act like if a durable
Modified: activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/TopicDTO.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/TopicDTO.java?rev=1134769&r1=1134768&r2=1134769&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/TopicDTO.java (original)
+++ activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/TopicDTO.java Sat Jun 11 21:25:50 2011
@@ -27,7 +27,18 @@ import javax.xml.bind.annotation.*;
@XmlAccessorType(XmlAccessType.FIELD)
public class TopicDTO extends StringIdDTO {
- @XmlElement(name="slow_consumer_policy")
+ /**
+ * Controls when the topic will auto delete.
+ * If set to zero, then the topic will NOT auto
+ * delete, otherwise the topic will auto delete
+ * after it has been unused for the number
+ * of seconds configured in this field. If unset,
+ * it defaults to 5 minutes
+ */
+ @XmlAttribute(name="auto_delete_after")
+ public Integer auto_delete_after;
+
+ @XmlAttribute(name="slow_consumer_policy")
public String slow_consumer_policy;
@XmlElement(name="acl")