You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2010/12/08 21:54:14 UTC
svn commit: r1043673 - in
/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker:
Queue.scala Router.scala
Author: chirino
Date: Wed Dec 8 20:54:14 2010
New Revision: 1043673
URL: http://svn.apache.org/viewvc?rev=1043673&view=rev
Log:
- More queue prefetch tunning.
- Set the default policy to queue messages in the slow topic consumer case.
Modified:
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=1043673&r1=1043672&r2=1043673&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala Wed Dec 8 20:54:14 2010
@@ -35,14 +35,18 @@ import OptionSupport._
object Queue extends Log {
val subcsription_counter = new AtomicInteger(0)
+
+ val PREFTCH_LOAD_FLAG = 1.toByte
+ val PREFTCH_HOLD_FLAG = 2.toByte
}
+import Queue._
+
/**
*
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
-class Queue(val host: VirtualHost, var id:Long, val binding:Binding, var config:QueueDTO) extends BaseRetained with Route with DeliveryConsumer with BaseService with DispatchLogging {
- override protected def log = Queue
+class Queue(val host: VirtualHost, var id:Long, val binding:Binding, var config:QueueDTO) extends BaseRetained with Route with DeliveryConsumer with BaseService {
var inbound_sessions = Set[DeliverySession]()
var all_subscriptions = Map[DeliveryConsumer, Subscription]()
@@ -396,25 +400,20 @@ class Queue(val host: VirtualHost, var i
// target tune_min_subscription_rate / sec
all_subscriptions.foreach{ case (consumer, sub)=>
- val advanced = if ( sub.tail_parkings > 0 ) {
- // guesstimate what full speed would have been.
- sub.advanced_size.max(sub.best_advanced_size)
- } else {
- sub.advanced_size
- }
-
- // keep track of the last few advance sizes..
- sub.advanced_sizes += advanced
- while( sub.advanced_sizes.size > 10 ) {
- sub.advanced_sizes = sub.advanced_sizes.drop(1)
- }
+ if ( sub.tail_parkings < 0 ) {
+
+ // re-calc the avg_advanced_size
+ sub.advanced_sizes += sub.advanced_size
+ while( sub.advanced_sizes.size > 5 ) {
+ sub.advanced_sizes = sub.advanced_sizes.drop(1)
+ }
+ sub.avg_advanced_size = sub.advanced_sizes.foldLeft(0)(_ + _) / sub.advanced_sizes.size
- sub.best_advanced_size = sub.advanced_sizes.foldLeft(0)(_ max _)
+ }
sub.total_advanced_size += sub.advanced_size
sub.advanced_size = 0
sub.tail_parkings = 0
-
}
@@ -591,7 +590,7 @@ class QueueEntry(val queue:Queue, val se
// The number of subscriptions which have requested this entry to be prefeteched (held in memory) so that it's
// ready for them to get dispatched.
- var prefetch_flags = 0
+ var prefetch_flags:Byte = 0
// The current state of the entry: Tail | Loaded | Flushed | Tombstone
var state:EntryState = new Tail
@@ -1278,7 +1277,7 @@ class Subscription(val queue:Queue, val
var advanced_size = 0
var advanced_sizes = ListBuffer[Int]() // use circular buffer instead.
- var best_advanced_size = queue.tune_consumer_buffer * 100
+ var avg_advanced_size = queue.tune_consumer_buffer
var tail_parkings = 1
var total_dispatched_count = 0L
@@ -1388,15 +1387,15 @@ class Subscription(val queue:Queue, val
var remaining = queue.tune_consumer_buffer - acquired_size
while( remaining>0 && next!=null ) {
remaining -= next.size
- next.prefetch_flags |= 1
+ next.prefetch_flags = (next.prefetch_flags | PREFTCH_LOAD_FLAG).toByte
next.load
next = next.getNext
}
- remaining = best_advanced_size
+ remaining = avg_advanced_size
while( remaining>0 && next!=null ) {
remaining -= next.size
- next.prefetch_flags |= 2
+ next.prefetch_flags = (next.prefetch_flags | PREFTCH_HOLD_FLAG).toByte
next = next.getNext
}
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala?rev=1043673&r1=1043672&r2=1043673&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala Wed Dec 8 20:54:14 2010
@@ -356,7 +356,7 @@ class RoutingNode(val router:Router, val
}
def unified = config.unified.getOrElse(false)
- def slow_consumer_policy = config.slow_consumer_policy.getOrElse("block")
+ def slow_consumer_policy = config.slow_consumer_policy.getOrElse("queue")
var consumer_proxies = Map[DeliveryConsumer, DeliveryConsumer]()