You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2011/06/11 23:25:50 UTC

svn commit: r1134769 - in /activemq/activemq-apollo/trunk: apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/ apollo-dto/src/main/java/org/apache/activemq/apollo/dto/

Author: chirino
Date: Sat Jun 11 21:25:50 2011
New Revision: 1134769

URL: http://svn.apache.org/viewvc?rev=1134769&view=rev
Log:
Fixes https://issues.apache.org/jira/browse/APLO-45 : Support auto deleting idle queues and topics

Modified:
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueBinding.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala
    activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueDTO.java
    activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/TopicDTO.java

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala?rev=1134769&r1=1134768&r2=1134769&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala Sat Jun 11 21:25:50 2011
@@ -66,6 +66,16 @@ object LocalRouter extends Log {
   val QUEUE_KIND = "queue"
   val DEFAULT_QUEUE_PATH = "default"
 
+  def is_wildcard_config(dto:StringIdDTO) = {
+    if( dto.id == null ) {
+      true
+    } else {
+      val parts = destination_parser.parts(dto.id)
+      val path = destination_parser.decode_path(parts)
+      PathParser.containsWildCards(path)
+    }
+  }
+
   class ConsumerContext(val destination:DestinationDTO, val consumer:DeliveryConsumer, val security:SecurityContext) {
     override def hashCode: Int = consumer.hashCode
 
@@ -314,7 +324,9 @@ class LocalRouter(val virtual_host:Virtu
     def topic_config(name:Path):TopicDTO = {
       import collection.JavaConversions._
       import destination_parser._
-      virtual_host.config.topics.find( x=> decode_filter(x.id).matches(name) ).getOrElse(new TopicDTO)
+      virtual_host.config.topics.find{ x=>
+        x.id==null || decode_filter(x.id).matches(name)
+      }.getOrElse(new TopicDTO)
     }
 
     override def connect(path:Path, destination:DestinationDTO, producer:BindableDeliveryProducer, security:SecurityContext):Unit = {
@@ -351,7 +363,7 @@ class LocalRouter(val virtual_host:Virtu
         return new Failure("Not authorized to create the destination")
       }
 
-      val topic = new Topic(LocalRouter.this, destination.asInstanceOf[TopicDestinationDTO], ()=>topic_config(path), path.toString(destination_parser))
+      val topic = new Topic(LocalRouter.this, destination.asInstanceOf[TopicDestinationDTO], ()=>topic_config(path), path.toString(destination_parser), path)
       add_destination(path, topic)
       Success(topic)
     }

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=1134769&r1=1134768&r2=1134769&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala Sat Jun 11 21:25:50 2011
@@ -136,8 +136,22 @@ class Queue(val router: LocalRouter, val
       record.binding_kind = binding.binding_kind
       virtual_host.store.add_queue(record) { rc => Unit }
     }
+
+    auto_delete_after = config.auto_delete_after.getOrElse(60*5)
+    if( auto_delete_after!= 0 ) {
+      // we don't auto delete explicitly configured queues,
+      // non destination queues, or unified queues.
+      if( config.unified.getOrElse(false) || !binding.isInstanceOf[QueueDomainQueueBinding] || !LocalRouter.is_wildcard_config(config) ) {
+        auto_delete_after = 0
+      }
+    }
+
+    println("auto_delete_after: "+this+": "+auto_delete_after)
+
+  }
+  dispatch_queue {
+    configure(config)
   }
-  configure(config)
 
   var last_maintenance_ts = System.currentTimeMillis
 
@@ -178,6 +192,9 @@ class Queue(val router: LocalRouter, val
 
   var restored_from_store = false
 
+  var auto_delete_after = 0
+  var idled_at = 0L
+
   def update(on_completed:Runnable) = dispatch_queue {
 
     val prev_persistent = tune_persistent
@@ -201,11 +218,30 @@ class Queue(val router: LocalRouter, val
     swapped_in_size_max += (tune_queue_buffer-prev_queue_buffer)
 
     restore_from_store {
+      check_idle
       trigger_swap
       on_completed.run
     }
   }
 
+  def check_idle {
+    println("check_idle auto_delete_after: "+this+": "+auto_delete_after)
+    if (producers.isEmpty && all_subscriptions.isEmpty && queue_items==0 ) {
+      if (idled_at==0) {
+        val now = System.currentTimeMillis()
+        idled_at = now
+        if( auto_delete_after!=0 ) {
+          dispatch_queue.after(auto_delete_after, TimeUnit.SECONDS) {
+            if( now == idled_at ) {
+              router._destroy_queue(this)
+            }
+          }
+        }
+      }
+    } else {
+      idled_at = 0
+    }
+  }
 
   def restore_from_store(on_completed: => Unit) {
     if (!restored_from_store && tune_persistent) {
@@ -236,11 +272,13 @@ class Queue(val router: LocalRouter, val
   }
 
   protected def _start(on_completed: Runnable) = {
+    println("_start auto_delete_after: "+this+": "+auto_delete_after)
 
     swapped_in_size_max += tune_queue_buffer;
 
     restore_from_store {
 
+
       // by the time this is run, consumers and producers may have already joined.
       on_completed.run
       schedule_periodic_maintenance
@@ -250,6 +288,7 @@ class Queue(val router: LocalRouter, val
       }
 
       // kick off dispatching to the consumers.
+      check_idle
       trigger_swap
       dispatch_queue << head_entry
 
@@ -504,6 +543,7 @@ class Queue(val router: LocalRouter, val
   def connect(p: DeliveryProducer) = new DeliverySession {
     retain
 
+
     override def toString = Queue.this.toString
 
     override def consumer = Queue.this
@@ -615,6 +655,7 @@ class Queue(val router: LocalRouter, val
     } else {
       dispatch_queue {
         producers += producer
+        check_idle
       }
       producer.bind(this::Nil)
     }
@@ -628,6 +669,7 @@ class Queue(val router: LocalRouter, val
     } else {
       dispatch_queue {
         producers -= producer
+        check_idle
       }
       producer.unbind(this::Nil)
     }
@@ -1467,6 +1509,7 @@ class Subscription(val queue:Queue, val 
       refill_prefetch
       queue.dispatch_queue << queue.head_entry
     }
+    queue.check_idle
   }
 
   def close() = {
@@ -1497,6 +1540,7 @@ class Subscription(val queue:Queue, val 
       session = null
       consumer.release
 
+      queue.check_idle
       queue.trigger_swap
     } else {}
   }

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueBinding.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueBinding.scala?rev=1134769&r1=1134768&r2=1134769&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueBinding.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueBinding.scala Sat Jun 11 21:25:50 2011
@@ -123,10 +123,7 @@ object QueueDomainQueueBinding extends Q
     import LocalRouter.destination_parser._
 
     def matches(x:QueueDTO):Boolean = {
-      if( x.id != null && !decode_filter(x.id).matches(path)) {
-        return false
-      }
-      true
+      x.id==null || decode_filter(x.id).matches(path)
     }
     virtual_host.config.queues.find(matches _).getOrElse(new QueueDTO)
   }

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala?rev=1134769&r1=1134768&r2=1134769&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala Sat Jun 11 21:25:50 2011
@@ -17,12 +17,12 @@
 package org.apache.activemq.apollo.broker
 
 import org.apache.activemq.apollo.util._
-import path.PathParser._
+import path.Path
 import scala.collection.immutable.List
-import org.apache.activemq.apollo.util.path.Path
 import org.apache.activemq.apollo.dto._
-import security.SecurityContext
 import collection.mutable.{HashMap, ListBuffer}
+import java.util.concurrent.TimeUnit
+import org.fusesource.hawtdispatch._
 
 /**
  * <p>
@@ -31,14 +31,19 @@ import collection.mutable.{HashMap, List
  *
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-class Topic(val router:LocalRouter, val destination_dto:TopicDestinationDTO, var config_updater: ()=>TopicDTO, val id:String) extends DomainDestination {
+class Topic(val router:LocalRouter, val destination_dto:TopicDestinationDTO, var config_updater: ()=>TopicDTO, val id:String, path:Path) extends DomainDestination {
 
   var producers = ListBuffer[BindableDeliveryProducer]()
   var consumers = ListBuffer[DeliveryConsumer]()
   var durable_subscriptions = ListBuffer[Queue]()
   var consumer_queues = HashMap[DeliveryConsumer, Queue]()
+  var idled_at = 0L
   val created_at = System.currentTimeMillis()
-  var config = config_updater()
+  var auto_delete_after = 0
+
+  var config:TopicDTO = _
+
+  refresh_config
 
   import OptionSupport._
 
@@ -47,10 +52,42 @@ class Topic(val router:LocalRouter, val 
   def slow_consumer_policy = config.slow_consumer_policy.getOrElse("block")
 
   def update(on_completed:Runnable) = {
-    config = config_updater()
+    refresh_config
     on_completed.run
   }
 
+  def refresh_config = {
+    import OptionSupport._
+
+    config = config_updater()
+    auto_delete_after = config.auto_delete_after.getOrElse(60*5)
+    if( auto_delete_after!= 0 ) {
+      // we don't auto delete explicitly configured destinations.
+      if( !LocalRouter.is_wildcard_config(config) ) {
+        auto_delete_after = 0
+      }
+    }
+    check_idle
+  }
+
+  def check_idle {
+    if (producers.isEmpty && consumers.isEmpty && durable_subscriptions.isEmpty) {
+      if (idled_at==0) {
+        val now = System.currentTimeMillis()
+        idled_at = now
+        if( auto_delete_after!=0 ) {
+          virtual_host.dispatch_queue.after(auto_delete_after, TimeUnit.SECONDS) {
+            if( now == idled_at ) {
+              router.topic_domain.remove_destination(path, this)
+            }
+          }
+        }
+      }
+    } else {
+      idled_at = 0
+    }
+  }
+
   def bind (destination: DestinationDTO, consumer:DeliveryConsumer) = {
     destination match {
       case null=> // unified queue case
@@ -85,6 +122,7 @@ class Topic(val router:LocalRouter, val 
         })
 
     }
+    check_idle
   }
 
   def unbind (consumer:DeliveryConsumer, persistent:Boolean) = {
@@ -117,6 +155,7 @@ class Topic(val router:LocalRouter, val 
           })
         }
     }
+    check_idle
 
   }
 
@@ -133,6 +172,7 @@ class Topic(val router:LocalRouter, val 
         }
       }
     }
+    check_idle
   }
 
   def unbind_durable_subscription(destination: DurableSubscriptionDestinationDTO, queue:Queue)  = {
@@ -148,16 +188,19 @@ class Topic(val router:LocalRouter, val 
         }
       }
     }
+    check_idle
   }
 
   def connect (destination:DestinationDTO, producer:BindableDeliveryProducer) = {
     producers += producer
     producer.bind(consumers.toList ::: durable_subscriptions.toList)
+    check_idle
   }
 
   def disconnect (producer:BindableDeliveryProducer) = {
     producers = producers.filterNot( _ == producer )
     producer.unbind(consumers.toList ::: durable_subscriptions.toList)
+    check_idle
   }
 
 }

Modified: activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueDTO.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueDTO.java?rev=1134769&r1=1134768&r2=1134769&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueDTO.java (original)
+++ activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueDTO.java Sat Jun 11 21:25:50 2011
@@ -28,6 +28,17 @@ import javax.xml.bind.annotation.*;
 public class QueueDTO extends StringIdDTO {
 
     /**
+     * Controls when the queue will auto delete.
+     * If set to zero, then the queue will NOT auto
+     * delete, otherwise the queue will auto delete
+     * after it has been unused for the number
+     * of seconds configured in this field.  If unset,
+     * it defaults to 5 minutes.
+     */
+    @XmlAttribute(name="auto_delete_after")
+    public Integer auto_delete_after;
+
+    /**
      * If set to true, then routing then there is no difference between
      * sending to a queue or topic of the same name.  The first time
      * a queue is created, it will act like if a durable

Modified: activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/TopicDTO.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/TopicDTO.java?rev=1134769&r1=1134768&r2=1134769&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/TopicDTO.java (original)
+++ activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/TopicDTO.java Sat Jun 11 21:25:50 2011
@@ -27,7 +27,18 @@ import javax.xml.bind.annotation.*;
 @XmlAccessorType(XmlAccessType.FIELD)
 public class TopicDTO extends StringIdDTO {
 
-    @XmlElement(name="slow_consumer_policy")
+    /**
+     * Controls when the topic will auto delete.
+     * If set to zero, then the topic will NOT auto
+     * delete, otherwise the topic will auto delete
+     * after it has been unused for the number
+     * of seconds configured in this field.  If unset,
+     * it defaults to 5 minutes
+     */
+    @XmlAttribute(name="auto_delete_after")
+    public Integer auto_delete_after;
+
+    @XmlAttribute(name="slow_consumer_policy")
     public String slow_consumer_policy;
 
     @XmlElement(name="acl")