You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2012/02/20 11:02:36 UTC
svn commit: r1291169 -
/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
Author: chirino
Date: Mon Feb 20 10:02:35 2012
New Revision: 1291169
URL: http://svn.apache.org/viewvc?rev=1291169&view=rev
Log:
If a queue consumer did not ack sooner than the consumer buffer, we could stop dispatching to him if the messages needed to be prefetched.
Modified:
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=1291169&r1=1291168&r2=1291169&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala Mon Feb 20 10:02:35 2012
@@ -306,7 +306,7 @@ class Queue(val router: LocalRouter, val
e.count = cur.count
e.size = cur.size
e.consumer_count = cur.parked.size
- e.is_prefetched = cur.is_prefetched
+ e.is_prefetched = cur.prefetched
e.state = cur.label
rc.entries.add(e)
@@ -636,7 +636,7 @@ class Queue(val router: LocalRouter, val
var total_items = 0L
var total_size = 0L
while (cur != null) {
- if (cur.is_loaded || cur.hasSubs || cur.is_prefetched || cur.is_swapped_range ) {
+ if (cur.is_loaded || cur.hasSubs || cur.prefetched || cur.is_swapped_range ) {
info(" => " + cur)
}
@@ -1110,8 +1110,6 @@ class QueueEntry(val queue:Queue, val se
// The current state of the entry: Head | Tail | Loaded | Swapped | SwappedRange
var state:EntryState = new Tail
- def is_prefetched = prefetched
-
def <(value:QueueEntry) = this.seq < value.seq
def <=(value:QueueEntry) = this.seq <= value.seq
@@ -2209,7 +2207,7 @@ class Subscription(val queue:Queue, val
pos // start prefetching from the current position.
}
- var remaining = queue.tune_consumer_buffer - acquired_size;
+ var remaining = queue.tune_consumer_buffer;
while( remaining>0 && cursor!=null ) {
val next = cursor.getNext
// Browsers prefetch all messages..