You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2010/07/07 06:13:40 UTC
svn commit: r961172 -
/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
Author: chirino
Date: Wed Jul 7 04:13:40 2010
New Revision: 961172
URL: http://svn.apache.org/viewvc?rev=961172&view=rev
Log:
Fixed dispatching problems /w messages larger than the consumer prefetch window.
Modified:
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=961172&r1=961171&r2=961172&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala Wed Jul 7 04:13:40 2010
@@ -308,7 +308,7 @@ class Queue(val host: VirtualHost, val d
check_counter += 1
if( (check_counter%10)==0 ) {
- display_stats
+// display_stats
}
if( (check_counter%25)==0 ) {
@@ -985,7 +985,9 @@ class QueueEntry(val queue:Queue, val se
// advance: not interested.
advancing += sub
} else {
- if( sub.full ) {
+
+ // Is the sub flow controlled?
+ if( sub.full || (sub.prefetchFull && !sub.is_prefetched(entry) ) ) {
// hold back: flow controlled
heldBack += sub
} else {
@@ -1009,13 +1011,7 @@ class QueueEntry(val queue:Queue, val se
// The acquiring sub is added last to the list so that
// the other competing subs get first dibs at the next entry.
if( acquiringSub != null ) {
-
- // Advancing may need to be held back because the sub's prefetch is full
- if( acquiringSub.prefetchFull && !acquiringSub.is_prefetched(getNext) ) {
- heldBack += acquiringSub
- } else {
- advancing += acquiringSub
- }
+ advancing += acquiringSub
}
if ( advancing.isEmpty ) {
@@ -1233,7 +1229,7 @@ class Subscription(queue:Queue) extends
}
def close() = {
- pos.-=(this)
+ pos -= this
invalidate_prefetch
@@ -1249,11 +1245,6 @@ class Subscription(queue:Queue) extends
next = next.getNext
cur.nack // this unlinks the entry.
}
-
- // show the queue entries... after we disconnect.
- queue.dispatchQueue.dispatchAfter(1, TimeUnit.SECONDS, ^{
- queue.display_active_entries
- })
}
/**
@@ -1262,11 +1253,14 @@ class Subscription(queue:Queue) extends
*/
def advance(value:QueueEntry):Unit = {
assert(value!=null)
+ assert(pos!=null)
// Remove the previous pos from the prefetch counters.
if( prefetch_tail!=null && !pos.is_head) {
remove_from_prefetch(pos)
}
+
+
advanced_size += pos.size
pos = value
@@ -1399,8 +1393,12 @@ class Subscription(queue:Queue) extends
// we may now be able to prefetch some messages..
acquired_size -= entry.size
+
+ val next = entry.nextOrTail
entry.remove // entry size changes to 0
+
refill_prefetch
+ next.run
}
def nack = {