You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2010/12/08 21:54:14 UTC

svn commit: r1043673 - in /activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker: Queue.scala Router.scala

Author: chirino
Date: Wed Dec  8 20:54:14 2010
New Revision: 1043673

URL: http://svn.apache.org/viewvc?rev=1043673&view=rev
Log:
- More queue prefetch tunning.
- Set the default policy to queue messages in the slow topic consumer case.

Modified:
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=1043673&r1=1043672&r2=1043673&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala Wed Dec  8 20:54:14 2010
@@ -35,14 +35,18 @@ import OptionSupport._
 
 object Queue extends Log {
   val subcsription_counter = new AtomicInteger(0)
+
+  val PREFTCH_LOAD_FLAG = 1.toByte
+  val PREFTCH_HOLD_FLAG = 2.toByte
 }
 
+import Queue._
+
 /**
  *
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-class Queue(val host: VirtualHost, var id:Long, val binding:Binding, var config:QueueDTO) extends BaseRetained with Route with DeliveryConsumer with BaseService with DispatchLogging {
-  override protected def log = Queue
+class Queue(val host: VirtualHost, var id:Long, val binding:Binding, var config:QueueDTO) extends BaseRetained with Route with DeliveryConsumer with BaseService {
 
   var inbound_sessions = Set[DeliverySession]()
   var all_subscriptions = Map[DeliveryConsumer, Subscription]()
@@ -396,25 +400,20 @@ class Queue(val host: VirtualHost, var i
         // target tune_min_subscription_rate / sec
         all_subscriptions.foreach{ case (consumer, sub)=>
           
-          val advanced = if ( sub.tail_parkings > 0 ) {
-            // guesstimate what full speed would have been.
-            sub.advanced_size.max(sub.best_advanced_size)
-          } else {
-            sub.advanced_size
-          }
-          
-          // keep track of the last few advance sizes..
-          sub.advanced_sizes += advanced
-          while( sub.advanced_sizes.size > 10 ) {
-            sub.advanced_sizes = sub.advanced_sizes.drop(1)
-          }
+          if ( sub.tail_parkings < 0 ) {
+
+            // re-calc the avg_advanced_size
+            sub.advanced_sizes += sub.advanced_size
+            while( sub.advanced_sizes.size > 5 ) {
+              sub.advanced_sizes = sub.advanced_sizes.drop(1)
+            }
+            sub.avg_advanced_size = sub.advanced_sizes.foldLeft(0)(_ + _) /  sub.advanced_sizes.size
 
-          sub.best_advanced_size = sub.advanced_sizes.foldLeft(0)(_ max _)
+          }
           
           sub.total_advanced_size += sub.advanced_size
           sub.advanced_size = 0
           sub.tail_parkings = 0
-          
 
         }
 
@@ -591,7 +590,7 @@ class QueueEntry(val queue:Queue, val se
 
   // The number of subscriptions which have requested this entry to be prefeteched (held in memory) so that it's
   // ready for them to get dispatched.
-  var prefetch_flags = 0
+  var prefetch_flags:Byte = 0
 
   // The current state of the entry: Tail | Loaded | Flushed | Tombstone
   var state:EntryState = new Tail
@@ -1278,7 +1277,7 @@ class Subscription(val queue:Queue, val 
   var advanced_size = 0
   var advanced_sizes = ListBuffer[Int]() // use circular buffer instead.
 
-  var best_advanced_size = queue.tune_consumer_buffer * 100
+  var avg_advanced_size = queue.tune_consumer_buffer
   var tail_parkings = 1
 
   var total_dispatched_count = 0L
@@ -1388,15 +1387,15 @@ class Subscription(val queue:Queue, val 
     var remaining = queue.tune_consumer_buffer - acquired_size
     while( remaining>0 && next!=null ) {
       remaining -= next.size
-      next.prefetch_flags |= 1
+      next.prefetch_flags = (next.prefetch_flags | PREFTCH_LOAD_FLAG).toByte
       next.load
       next = next.getNext
     }
 
-    remaining = best_advanced_size 
+    remaining = avg_advanced_size
     while( remaining>0 && next!=null ) {
       remaining -= next.size
-      next.prefetch_flags |= 2
+      next.prefetch_flags = (next.prefetch_flags | PREFTCH_HOLD_FLAG).toByte
       next = next.getNext
     }
 

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala?rev=1043673&r1=1043672&r2=1043673&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala Wed Dec  8 20:54:14 2010
@@ -356,7 +356,7 @@ class RoutingNode(val router:Router, val
   }
 
   def unified = config.unified.getOrElse(false)
-  def slow_consumer_policy = config.slow_consumer_policy.getOrElse("block")
+  def slow_consumer_policy = config.slow_consumer_policy.getOrElse("queue")
 
   var consumer_proxies = Map[DeliveryConsumer, DeliveryConsumer]()