You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2012/09/19 04:28:00 UTC

svn commit: r1387436 - in /activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker: Queue.scala Sink.scala

Author: chirino
Date: Wed Sep 19 02:27:59 2012
New Revision: 1387436

URL: http://svn.apache.org/viewvc?rev=1387436&view=rev
Log:
Last commit for APLO-244 introduce a perf regression for the queue case which this commit should fix.

Modified:
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=1387436&r1=1387435&r2=1387436&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala Wed Sep 19 02:27:59 2012
@@ -92,7 +92,7 @@ class Queue(val router: LocalRouter, val
   ack_source.setEventHandler(^ {drain_acks});
   ack_source.resume
 
-  val session_manager = new SessionSinkMux[Delivery](messages, dispatch_queue, Delivery, Integer.MAX_VALUE, virtual_host.broker.auto_tuned_send_receiver_buffer_size) {
+  val session_manager = new SessionSinkMux[Delivery](messages, dispatch_queue, Delivery, Integer.MAX_VALUE, 1024*640) {
     override def time_stamp = now
   }
 
@@ -223,10 +223,14 @@ class Queue(val router: LocalRouter, val
   def configure(update:QueueSettingsDTO) = {
     def mem_size(value:String, default:String) = MemoryPropertyEditor.parse(Option(value).getOrElse(default)).toInt
 
-    producer_swapped_in.size_max += mem_size(update.tail_buffer, "640k") - Option(config).map{ config=>
+    var new_tail_buffer = mem_size(update.tail_buffer, "640k")
+    var old_tail_buffer = Option(config).map { config =>
       mem_size(config.tail_buffer, "640k")
     }.getOrElse(0)
 
+    producer_swapped_in.size_max += new_tail_buffer - old_tail_buffer
+    session_manager.resize(Int.MaxValue, new_tail_buffer)
+
     tune_persistent = virtual_host.store !=null && update.persistent.getOrElse(true)
     tune_swap = tune_persistent && update.swap.getOrElse(true)
     tune_swap_range_size = update.swap_range_size.getOrElse(10000)

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala?rev=1387436&r1=1387435&r2=1387436&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala Wed Sep 19 02:27:59 2012
@@ -342,7 +342,7 @@ trait SessionSinkFilter[T] extends Sessi
  *
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-class SessionSinkMux[T](val downstream:Sink[(Session[T], T)], val consumer_queue:DispatchQueue, val sizer:Sizer[T], delivery_credits:Int, val size_credits:Int) {
+class SessionSinkMux[T](val downstream:Sink[(Session[T], T)], val consumer_queue:DispatchQueue, val sizer:Sizer[T], var delivery_credits:Int, var size_credits:Int) {
 
   var sessions = HashSet[Session[T]]()
   var overflowed_sessions = new LinkedNodeList[SessionLinkedNode[T]]()
@@ -370,6 +370,18 @@ class SessionSinkMux[T](val downstream:S
     }
   }
 
+  def resize(new_delivery_credits:Int, new_size_credits:Int) = consumer_queue {
+    val delivery_credits_change = new_delivery_credits - delivery_credits
+    if ( delivery_credits_change!=0 ) {
+      for ( session <- sessions ) {
+        session.credit(delivery_credits_change, 0);
+      }
+    }
+    this.delivery_credits = new_delivery_credits
+    this.size_credits = new_size_credits
+    schedual_rebalance
+  }
+
   def time_stamp = 0L
 
   downstream.refiller = ^{ drain_source.merge(0x01) }