You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2011/06/11 18:09:32 UTC

svn commit: r1134680 - in /activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker: LocalRouter.scala Queue.scala QueueBinding.scala Topic.scala

Author: chirino
Date: Sat Jun 11 16:09:32 2011
New Revision: 1134680

URL: http://svn.apache.org/viewvc?rev=1134680&view=rev
Log:
Further work to support https://issues.apache.org/jira/browse/APLO-39 : we now handle topic and queue config updates.

Modified:
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueBinding.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala?rev=1134680&r1=1134679&r2=1134680&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala Sat Jun 11 16:09:32 2011
@@ -44,6 +44,7 @@ trait DomainDestination {
   def connect (destination:DestinationDTO, producer:BindableDeliveryProducer)
   def disconnect (producer:BindableDeliveryProducer)
 
+  def update(on_completed:Runnable):Unit
 }
 
 /**
@@ -130,6 +131,12 @@ class LocalRouter(val virtual_host:Virtu
       collectionAsScalaIterable(destination_by_path.get( path ))
     }
 
+    def apply_update(traker:LoggingTracker) = {
+      destinations.foreach { dest=>
+        dest.update(traker.task("update "+dest))
+      }
+    }
+
     def create_destination(path:Path, destination:DestinationDTO, security:SecurityContext):Result[D,String]
 
     def get_or_create_destination(path:Path, destination:DestinationDTO, security:SecurityContext):Result[D,String] = {
@@ -342,7 +349,7 @@ class LocalRouter(val virtual_host:Virtu
         return new Failure("Not authorized to create the destination")
       }
 
-      val topic = new Topic(LocalRouter.this, destination.asInstanceOf[TopicDestinationDTO], dto, path.toString(destination_parser))
+      val topic = new Topic(LocalRouter.this, destination.asInstanceOf[TopicDestinationDTO], ()=>topic_config(path), path.toString(destination_parser))
       add_destination(path, topic)
       Success(topic)
     }
@@ -838,16 +845,6 @@ class LocalRouter(val virtual_host:Virtu
     val config = binding.config(virtual_host)
 
     val queue = new Queue(this, qid, binding, config)
-    if( queue.tune_persistent && id == -1 ) {
-
-      val record = new QueueRecord
-      record.key = qid
-      record.binding_data = binding.binding_data
-      record.binding_kind = binding.binding_kind
-
-      virtual_host.store.add_queue(record) { rc => Unit }
-
-    }
 
     queue.start
     queues_by_binding.put(binding, queue)
@@ -915,7 +912,9 @@ class LocalRouter(val virtual_host:Virtu
   }
 
   def apply_update(on_completed:Runnable) = {
-    // TODO: check to see if any of the destination configs need updating.
-    on_completed.run()
+    val tracker = new LoggingTracker("domain update", virtual_host.broker.console_log, dispatch_queue)
+    topic_domain.apply_update(tracker)
+    queue_domain.apply_update(tracker)
+    tracker.callback(on_completed)
   }
 }
\ No newline at end of file

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=1134680&r1=1134679&r2=1134680&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala Sat Jun 11 16:09:32 2011
@@ -128,6 +128,14 @@ class Queue(val router: LocalRouter, val
     tune_swap = tune_persistent && config.swap.getOrElse(true)
     tune_swap_range_size = config.swap_range_size.getOrElse(10000)
     tune_consumer_buffer = config.consumer_buffer.getOrElse(256*1024)
+
+    if( tune_persistent ) {
+      val record = new QueueRecord
+      record.key = store_id
+      record.binding_data = binding.binding_data
+      record.binding_kind = binding.binding_kind
+      virtual_host.store.add_queue(record) { rc => Unit }
+    }
   }
   configure(config)
 
@@ -168,11 +176,63 @@ class Queue(val router: LocalRouter, val
   swap_source.setEventHandler(^{ swap_messages });
   swap_source.resume
 
+  var restored_from_store = false
+
+  def update(on_completed:Runnable) = dispatch_queue {
+    val was_persistent = tune_persistent
+    val prev_size = tune_consumer_buffer
+    configure(binding.config(virtual_host))
+    val consumer_buffer_change = tune_consumer_buffer-prev_size
+    if( consumer_buffer_change!=0 ) {
+      // for each
+      all_subscriptions.values.foreach { sub =>
+        // open session
+        if( sub.session!=null ) {
+          // change the queue capacity, by the change in consumer buffer change.
+          addCapacity(consumer_buffer_change)
+        }
+      }
+    }
+    restore_from_store {
+      on_completed.run
+    }
+  }
+
+
+  def restore_from_store(on_completed: => Unit) {
+    if (!restored_from_store && tune_persistent) {
+      restored_from_store = true
+      virtual_host.store.list_queue_entry_ranges(store_id, tune_swap_range_size) { ranges =>
+        dispatch_queue {
+          if (ranges != null && !ranges.isEmpty) {
+
+            ranges.foreach {
+              range =>
+                val entry = new QueueEntry(Queue.this, range.first_entry_seq).init(range)
+                entries.addLast(entry)
+
+                message_seq_counter = range.last_entry_seq + 1
+                enqueue_item_counter += range.count
+                enqueue_size_counter += range.size
+                tail_entry = new QueueEntry(Queue.this, next_message_seq)
+            }
+
+            debug("restored: " + enqueue_item_counter)
+          }
+          on_completed
+        }
+      }
+    } else {
+      on_completed
+    }
+  }
+
   protected def _start(on_completed: Runnable) = {
 
     swapped_in_size_max = tune_queue_buffer;
 
-    def completed: Unit = {
+    restore_from_store {
+
       // by the time this is run, consumers and producers may have already joined.
       on_completed.run
       schedule_periodic_maintenance
@@ -184,32 +244,7 @@ class Queue(val router: LocalRouter, val
       // kick off dispatching to the consumers.
       trigger_swap
       dispatch_queue << head_entry
-    }
-
-    if( tune_persistent ) {
 
-      virtual_host.store.list_queue_entry_ranges(store_id, tune_swap_range_size) { ranges=>
-        dispatch_queue {
-          if( ranges!=null && !ranges.isEmpty ) {
-
-            ranges.foreach { range =>
-              val entry = new QueueEntry(Queue.this, range.first_entry_seq).init(range)
-              entries.addLast(entry)
-
-              message_seq_counter = range.last_entry_seq + 1
-              enqueue_item_counter += range.count
-              enqueue_size_counter += range.size
-              tail_entry = new QueueEntry(Queue.this, next_message_seq)
-            }
-
-            debug("restored: "+enqueue_item_counter)
-          }
-          completed
-        }
-      }
-
-    } else {
-      completed
     }
   }
 

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueBinding.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueBinding.scala?rev=1134680&r1=1134679&r2=1134680&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueBinding.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueBinding.scala Sat Jun 11 16:09:32 2011
@@ -117,6 +117,20 @@ object QueueDomainQueueBinding extends Q
       new QueueDomainQueueBinding(JsonCodec.encode(ptp_dto), ptp_dto)
     case _ => null
   }
+
+  def queue_config(virtual_host:VirtualHost, path:Path):QueueDTO = {
+    import collection.JavaConversions._
+    import LocalRouter.destination_parser._
+
+    def matches(x:QueueDTO):Boolean = {
+      if( x.id != null && !decode_filter(x.id).matches(path)) {
+        return false
+      }
+      true
+    }
+    virtual_host.config.queues.find(matches _).getOrElse(new QueueDTO)
+  }
+
 }
 
 
@@ -151,18 +165,7 @@ class QueueDomainQueueBinding(val bindin
   }
 
 
-  def config(host:VirtualHost):QueueDTO = {
-    import collection.JavaConversions._
-    import LocalRouter.destination_parser._
-
-    def matches(x:QueueDTO):Boolean = {
-      if( x.id != null && !decode_filter(x.id).matches(destination)) {
-        return false
-      }
-      true
-    }
-    host.config.queues.find(matches _).getOrElse(new QueueDTO)
-  }
+  def config(host:VirtualHost):QueueDTO = queue_config(host, destination)
 
 }
 

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala?rev=1134680&r1=1134679&r2=1134680&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala Sat Jun 11 16:09:32 2011
@@ -17,6 +17,7 @@
 package org.apache.activemq.apollo.broker
 
 import org.apache.activemq.apollo.util._
+import path.PathParser._
 import scala.collection.immutable.List
 import org.apache.activemq.apollo.util.path.Path
 import org.apache.activemq.apollo.dto._
@@ -30,13 +31,14 @@ import collection.mutable.{HashMap, List
  *
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-class Topic(val router:LocalRouter, val destination_dto:TopicDestinationDTO, val config:TopicDTO, val id:String) extends DomainDestination {
+class Topic(val router:LocalRouter, val destination_dto:TopicDestinationDTO, var config_updater: ()=>TopicDTO, val id:String) extends DomainDestination {
 
   var producers = ListBuffer[BindableDeliveryProducer]()
   var consumers = ListBuffer[DeliveryConsumer]()
   var durable_subscriptions = ListBuffer[Queue]()
   var consumer_queues = HashMap[DeliveryConsumer, Queue]()
   val created_at = System.currentTimeMillis()
+  var config = config_updater()
 
   import OptionSupport._
 
@@ -44,6 +46,11 @@ class Topic(val router:LocalRouter, val 
 
   def slow_consumer_policy = config.slow_consumer_policy.getOrElse("block")
 
+  def update(on_completed:Runnable) = {
+    config = config_updater()
+    on_completed.run
+  }
+
   def bind (destination: DestinationDTO, consumer:DeliveryConsumer) = {
     destination match {
       case null=> // unified queue case