You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2012/02/20 11:02:36 UTC

svn commit: r1291169 - /activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala

Author: chirino
Date: Mon Feb 20 10:02:35 2012
New Revision: 1291169

URL: http://svn.apache.org/viewvc?rev=1291169&view=rev
Log:
If a queue consumer did not ack sooner than the consumer buffer, we could stop dispatching to him if the messages needed to be prefetched.

Modified:
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=1291169&r1=1291168&r2=1291169&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala Mon Feb 20 10:02:35 2012
@@ -306,7 +306,7 @@ class Queue(val router: LocalRouter, val
         e.count = cur.count
         e.size = cur.size
         e.consumer_count = cur.parked.size
-        e.is_prefetched = cur.is_prefetched
+        e.is_prefetched = cur.prefetched
         e.state = cur.label
 
         rc.entries.add(e)
@@ -636,7 +636,7 @@ class Queue(val router: LocalRouter, val
     var total_items = 0L
     var total_size = 0L
     while (cur != null) {
-      if (cur.is_loaded || cur.hasSubs || cur.is_prefetched || cur.is_swapped_range ) {
+      if (cur.is_loaded || cur.hasSubs || cur.prefetched || cur.is_swapped_range ) {
         info("  => " + cur)
       }
 
@@ -1110,8 +1110,6 @@ class QueueEntry(val queue:Queue, val se
   // The current state of the entry: Head | Tail | Loaded | Swapped | SwappedRange
   var state:EntryState = new Tail
 
-  def is_prefetched = prefetched
-
   def <(value:QueueEntry) = this.seq < value.seq
   def <=(value:QueueEntry) = this.seq <= value.seq
 
@@ -2209,7 +2207,7 @@ class Subscription(val queue:Queue, val 
       pos // start prefetching from the current position.
     }
 
-    var remaining = queue.tune_consumer_buffer - acquired_size;
+    var remaining = queue.tune_consumer_buffer;
     while( remaining>0 && cursor!=null ) {
       val next = cursor.getNext
       // Browsers prefetch all messages..