You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2012/08/20 19:20:50 UTC

svn commit: r1375112 - in /activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker: Queue.scala Subscription.scala

Author: chirino
Date: Mon Aug 20 17:20:50 2012
New Revision: 1375112

URL: http://svn.apache.org/viewvc?rev=1375112&view=rev
Log:
Use the average dequeue rate over the last 15 seconds to figure out what the dequeue rate of the queue.

Modified:
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Subscription.scala

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=1375112&r1=1375111&r2=1375112&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala Mon Aug 20 17:20:50 2012
@@ -919,7 +919,7 @@ class Queue(val router: LocalRouter, val
       consumer_stall_ms += cs
       load_stall_ms += ls
       if(!sub.browser) {
-        delivery_rate += sub.enqueue_size_per_interval
+        delivery_rate += sub.avg_enqueue_size_per_interval
       }
     }
 

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Subscription.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Subscription.scala?rev=1375112&r1=1375111&r2=1375112&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Subscription.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Subscription.scala Mon Aug 20 17:20:50 2012
@@ -21,6 +21,7 @@ import org.fusesource.hawtdispatch._
 import org.apache.activemq.apollo.broker.store._
 import org.apache.activemq.apollo.util._
 import org.apache.activemq.apollo.util.list._
+
 /**
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
@@ -46,7 +47,20 @@ class Subscription(val queue:Queue, val 
   var acquired_size = 0L
   def acquired_count = acquired.size()
 
-  var enqueue_size_per_interval = 0
+  var enqueue_size_per_interval = new CircularBuffer[Int](15)
+
+  def avg_enqueue_size_per_interval = {
+    var rc = 0
+    if( enqueue_size_per_interval.size > 0 ) {
+      for( x <- enqueue_size_per_interval ) {
+        rc += x
+      }
+      rc = rc/ enqueue_size_per_interval.size
+    }
+    rc
+  }
+
+
   var enqueue_size_at_last_interval = 0L
 
   var consumer_stall_ms = 0L
@@ -238,7 +252,7 @@ class Subscription(val queue:Queue, val 
   }
 
   def adjust_prefetch_size = {
-    enqueue_size_per_interval = (session.enqueue_size_counter - enqueue_size_at_last_interval).toInt
+    enqueue_size_per_interval += (session.enqueue_size_counter - enqueue_size_at_last_interval).toInt
     enqueue_size_at_last_interval = session.enqueue_size_counter
 
     if(consumer_stall_start !=0) {