You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2011/06/11 18:09:32 UTC
svn commit: r1134680 - in
/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker:
LocalRouter.scala Queue.scala QueueBinding.scala Topic.scala
Author: chirino
Date: Sat Jun 11 16:09:32 2011
New Revision: 1134680
URL: http://svn.apache.org/viewvc?rev=1134680&view=rev
Log:
Further work to support https://issues.apache.org/jira/browse/APLO-39 : we now handle topic and queue config updates.
Modified:
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueBinding.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala?rev=1134680&r1=1134679&r2=1134680&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/LocalRouter.scala Sat Jun 11 16:09:32 2011
@@ -44,6 +44,7 @@ trait DomainDestination {
def connect (destination:DestinationDTO, producer:BindableDeliveryProducer)
def disconnect (producer:BindableDeliveryProducer)
+ def update(on_completed:Runnable):Unit
}
/**
@@ -130,6 +131,12 @@ class LocalRouter(val virtual_host:Virtu
collectionAsScalaIterable(destination_by_path.get( path ))
}
+ def apply_update(traker:LoggingTracker) = {
+ destinations.foreach { dest=>
+ dest.update(traker.task("update "+dest))
+ }
+ }
+
def create_destination(path:Path, destination:DestinationDTO, security:SecurityContext):Result[D,String]
def get_or_create_destination(path:Path, destination:DestinationDTO, security:SecurityContext):Result[D,String] = {
@@ -342,7 +349,7 @@ class LocalRouter(val virtual_host:Virtu
return new Failure("Not authorized to create the destination")
}
- val topic = new Topic(LocalRouter.this, destination.asInstanceOf[TopicDestinationDTO], dto, path.toString(destination_parser))
+ val topic = new Topic(LocalRouter.this, destination.asInstanceOf[TopicDestinationDTO], ()=>topic_config(path), path.toString(destination_parser))
add_destination(path, topic)
Success(topic)
}
@@ -838,16 +845,6 @@ class LocalRouter(val virtual_host:Virtu
val config = binding.config(virtual_host)
val queue = new Queue(this, qid, binding, config)
- if( queue.tune_persistent && id == -1 ) {
-
- val record = new QueueRecord
- record.key = qid
- record.binding_data = binding.binding_data
- record.binding_kind = binding.binding_kind
-
- virtual_host.store.add_queue(record) { rc => Unit }
-
- }
queue.start
queues_by_binding.put(binding, queue)
@@ -915,7 +912,9 @@ class LocalRouter(val virtual_host:Virtu
}
def apply_update(on_completed:Runnable) = {
- // TODO: check to see if any of the destination configs need updating.
- on_completed.run()
+ val tracker = new LoggingTracker("domain update", virtual_host.broker.console_log, dispatch_queue)
+ topic_domain.apply_update(tracker)
+ queue_domain.apply_update(tracker)
+ tracker.callback(on_completed)
}
}
\ No newline at end of file
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=1134680&r1=1134679&r2=1134680&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala Sat Jun 11 16:09:32 2011
@@ -128,6 +128,14 @@ class Queue(val router: LocalRouter, val
tune_swap = tune_persistent && config.swap.getOrElse(true)
tune_swap_range_size = config.swap_range_size.getOrElse(10000)
tune_consumer_buffer = config.consumer_buffer.getOrElse(256*1024)
+
+ if( tune_persistent ) {
+ val record = new QueueRecord
+ record.key = store_id
+ record.binding_data = binding.binding_data
+ record.binding_kind = binding.binding_kind
+ virtual_host.store.add_queue(record) { rc => Unit }
+ }
}
configure(config)
@@ -168,11 +176,63 @@ class Queue(val router: LocalRouter, val
swap_source.setEventHandler(^{ swap_messages });
swap_source.resume
+ var restored_from_store = false
+
+ def update(on_completed:Runnable) = dispatch_queue {
+ val was_persistent = tune_persistent
+ val prev_size = tune_consumer_buffer
+ configure(binding.config(virtual_host))
+ val consumer_buffer_change = tune_consumer_buffer-prev_size
+ if( consumer_buffer_change!=0 ) {
+ // for each
+ all_subscriptions.values.foreach { sub =>
+ // open session
+ if( sub.session!=null ) {
+ // change the queue capacity, by the change in consumer buffer change.
+ addCapacity(consumer_buffer_change)
+ }
+ }
+ }
+ restore_from_store {
+ on_completed.run
+ }
+ }
+
+
+ def restore_from_store(on_completed: => Unit) {
+ if (!restored_from_store && tune_persistent) {
+ restored_from_store = true
+ virtual_host.store.list_queue_entry_ranges(store_id, tune_swap_range_size) { ranges =>
+ dispatch_queue {
+ if (ranges != null && !ranges.isEmpty) {
+
+ ranges.foreach {
+ range =>
+ val entry = new QueueEntry(Queue.this, range.first_entry_seq).init(range)
+ entries.addLast(entry)
+
+ message_seq_counter = range.last_entry_seq + 1
+ enqueue_item_counter += range.count
+ enqueue_size_counter += range.size
+ tail_entry = new QueueEntry(Queue.this, next_message_seq)
+ }
+
+ debug("restored: " + enqueue_item_counter)
+ }
+ on_completed
+ }
+ }
+ } else {
+ on_completed
+ }
+ }
+
protected def _start(on_completed: Runnable) = {
swapped_in_size_max = tune_queue_buffer;
- def completed: Unit = {
+ restore_from_store {
+
// by the time this is run, consumers and producers may have already joined.
on_completed.run
schedule_periodic_maintenance
@@ -184,32 +244,7 @@ class Queue(val router: LocalRouter, val
// kick off dispatching to the consumers.
trigger_swap
dispatch_queue << head_entry
- }
-
- if( tune_persistent ) {
- virtual_host.store.list_queue_entry_ranges(store_id, tune_swap_range_size) { ranges=>
- dispatch_queue {
- if( ranges!=null && !ranges.isEmpty ) {
-
- ranges.foreach { range =>
- val entry = new QueueEntry(Queue.this, range.first_entry_seq).init(range)
- entries.addLast(entry)
-
- message_seq_counter = range.last_entry_seq + 1
- enqueue_item_counter += range.count
- enqueue_size_counter += range.size
- tail_entry = new QueueEntry(Queue.this, next_message_seq)
- }
-
- debug("restored: "+enqueue_item_counter)
- }
- completed
- }
- }
-
- } else {
- completed
}
}
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueBinding.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueBinding.scala?rev=1134680&r1=1134679&r2=1134680&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueBinding.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/QueueBinding.scala Sat Jun 11 16:09:32 2011
@@ -117,6 +117,20 @@ object QueueDomainQueueBinding extends Q
new QueueDomainQueueBinding(JsonCodec.encode(ptp_dto), ptp_dto)
case _ => null
}
+
+ def queue_config(virtual_host:VirtualHost, path:Path):QueueDTO = {
+ import collection.JavaConversions._
+ import LocalRouter.destination_parser._
+
+ def matches(x:QueueDTO):Boolean = {
+ if( x.id != null && !decode_filter(x.id).matches(path)) {
+ return false
+ }
+ true
+ }
+ virtual_host.config.queues.find(matches _).getOrElse(new QueueDTO)
+ }
+
}
@@ -151,18 +165,7 @@ class QueueDomainQueueBinding(val bindin
}
- def config(host:VirtualHost):QueueDTO = {
- import collection.JavaConversions._
- import LocalRouter.destination_parser._
-
- def matches(x:QueueDTO):Boolean = {
- if( x.id != null && !decode_filter(x.id).matches(destination)) {
- return false
- }
- true
- }
- host.config.queues.find(matches _).getOrElse(new QueueDTO)
- }
+ def config(host:VirtualHost):QueueDTO = queue_config(host, destination)
}
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala?rev=1134680&r1=1134679&r2=1134680&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Topic.scala Sat Jun 11 16:09:32 2011
@@ -17,6 +17,7 @@
package org.apache.activemq.apollo.broker
import org.apache.activemq.apollo.util._
+import path.PathParser._
import scala.collection.immutable.List
import org.apache.activemq.apollo.util.path.Path
import org.apache.activemq.apollo.dto._
@@ -30,13 +31,14 @@ import collection.mutable.{HashMap, List
*
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
-class Topic(val router:LocalRouter, val destination_dto:TopicDestinationDTO, val config:TopicDTO, val id:String) extends DomainDestination {
+class Topic(val router:LocalRouter, val destination_dto:TopicDestinationDTO, var config_updater: ()=>TopicDTO, val id:String) extends DomainDestination {
var producers = ListBuffer[BindableDeliveryProducer]()
var consumers = ListBuffer[DeliveryConsumer]()
var durable_subscriptions = ListBuffer[Queue]()
var consumer_queues = HashMap[DeliveryConsumer, Queue]()
val created_at = System.currentTimeMillis()
+ var config = config_updater()
import OptionSupport._
@@ -44,6 +46,11 @@ class Topic(val router:LocalRouter, val
def slow_consumer_policy = config.slow_consumer_policy.getOrElse("block")
+ def update(on_completed:Runnable) = {
+ config = config_updater()
+ on_completed.run
+ }
+
def bind (destination: DestinationDTO, consumer:DeliveryConsumer) = {
destination match {
case null=> // unified queue case