You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2010/07/07 06:11:40 UTC
svn commit: r961155 -
/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
Author: chirino
Date: Wed Jul 7 04:11:40 2010
New Revision: 961155
URL: http://svn.apache.org/viewvc?rev=961155&view=rev
Log:
Improved queue entry state tracking.
Modified:
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=961155&r1=961154&r2=961155&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala Wed Jul 7 04:11:40 2010
@@ -257,7 +257,7 @@ class Queue(val host: VirtualHost, val d
}
} else {
if( !sub.slow ) {
- debug("slow interval: %d, %d, %d", sub.slowIntervals, sub.tailParkings, cursorDelta)
+// debug("slow interval: %d, %d, %d", sub.slowIntervals, sub.tailParkings, cursorDelta)
sub.slowIntervals += 1
if( sub.slow ) {
debug("consumer is slow: %s", consumer)
@@ -292,7 +292,7 @@ class Queue(val host: VirtualHost, val d
}
- // Trigger a swap if we have slow consumers and we are full..
+ // Trigger a swap if we have consumers waiting for messages and we are full..
if( idleConsumerCount > 0 && messages.full && flushingSize==0 ) {
swap
}
@@ -633,7 +633,6 @@ class QueueEntry(val queue:Queue, val se
def flush = entry
def tombstone = {
- queue.size -= size
// Update the prefetch counter to reflect that this entry is no longer being prefetched.
var cur = entry
@@ -642,9 +641,6 @@ class QueueEntry(val queue:Queue, val se
(cur.browsing ::: cur.competing).foreach { sub => if( sub.prefetched(entry) ) { sub.removePrefetch(entry) } }
}
cur = cur.getPrevious
-
- // Sanity check.. we should always stop before we get to the last entry in the list.
- assert( cur != null , "illegal prefetch state detected.")
}
// if rv and lv are both adjacent tombstones, then this merges the rv
@@ -774,6 +770,7 @@ class QueueEntry(val queue:Queue, val se
flushing = false
queue.flushingSize-=size
}
+ queue.size -= size
super.tombstone
}
@@ -920,27 +917,24 @@ class QueueEntry(val queue:Queue, val se
// Flushed entries can't be dispatched until
// they get loaded.
def dispatch():QueueEntry = {
- if( !loading ) {
- var remaining = queue.tune_subscription_buffer - size
- load
+ if( !loading && hasSubs) {
- // make sure the next few entries are loaded too..
- var cur = getNext
- while( remaining>0 && cur!=null ) {
- remaining -= cur.size
- val flushed = cur.asFlushed
- if( flushed!=null && !flushed.loading) {
- flushed.load
- }
- cur = getNext
+ // I don't think this should ever happen as we should be prefetching the
+ // entry before we dispatch it.
+ warn("dispatch called on a flushed entry that is not loading.")
+
+ // ask the subs to fill the prefetches.. that should
+ // kick off a load
+ (browsing ::: competing).foreach { sub =>
+ sub.fillPrefetch
}
-
}
null
}
override def load() = {
if( !loading ) {
+// trace("Start entry load of message seq: %s", seq)
// start loading it back...
loading = true
queue.loadingSize += size
@@ -948,12 +942,15 @@ class QueueEntry(val queue:Queue, val se
// pass off to a source so it can aggregate multiple
// loads to reduce cross thread synchronization
if( delivery.isDefined ) {
+// debug("Store found message seq: %d", seq)
queue.store_load_source.merge((this, delivery.get))
} else {
+
+ debug("Detected store drop of message seq: %d", seq)
+
// Looks like someone else removed the message from the store.. lets just
// tombstone this entry now.
queue.dispatchQueue {
- debug("Detected store drop of message key: %d", messageKey)
tombstone
}
}
@@ -964,6 +961,7 @@ class QueueEntry(val queue:Queue, val se
def loaded(messageRecord:MessageRecord) = {
if( loading ) {
+// debug("Loaded message seq: ", seq )
loading = false
queue.loadingSize -= size
@@ -974,12 +972,15 @@ class QueueEntry(val queue:Queue, val se
queue.size += size
state = new Loaded(delivery)
+ } else {
+// debug("Ignoring store load of: ", messageKey)
}
}
override def tombstone = {
if( loading ) {
+// debug("Tombstoned, will ignore store load of seq: ", seq)
loading = false
queue.loadingSize -= size
}
@@ -1020,6 +1021,7 @@ class Subscription(queue:Queue) extends
// setting a new position..
if( pos!=null ) {
// Remove the previous pos from the prefetch counters.
+ pos.prefetched -= 1
removePrefetch(pos)
cursoredCounter += pos.size
}
@@ -1046,7 +1048,6 @@ class Subscription(queue:Queue) extends
}
def removePrefetch(value:QueueEntry):Unit = {
- value.prefetched -= 1
prefetchSize -= value.size
fillPrefetch()
}