You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2010/07/07 06:11:40 UTC

svn commit: r961155 - /activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala

Author: chirino
Date: Wed Jul  7 04:11:40 2010
New Revision: 961155

URL: http://svn.apache.org/viewvc?rev=961155&view=rev
Log:
Improved queue entry state tracking.

Modified:
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=961155&r1=961154&r2=961155&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala Wed Jul  7 04:11:40 2010
@@ -257,7 +257,7 @@ class Queue(val host: VirtualHost, val d
               }
             } else {
               if( !sub.slow ) {
-                debug("slow interval: %d, %d, %d", sub.slowIntervals, sub.tailParkings, cursorDelta)
+//                debug("slow interval: %d, %d, %d", sub.slowIntervals, sub.tailParkings, cursorDelta)
                 sub.slowIntervals += 1
                 if( sub.slow ) {
                   debug("consumer is slow: %s", consumer)
@@ -292,7 +292,7 @@ class Queue(val host: VirtualHost, val d
         }
 
 
-        // Trigger a swap if we have slow consumers and we are full..
+        // Trigger a swap if we have consumers waiting for messages and we are full..
         if( idleConsumerCount > 0 && messages.full && flushingSize==0 ) {
           swap
         }
@@ -633,7 +633,6 @@ class QueueEntry(val queue:Queue, val se
     def flush = entry
 
     def tombstone = {
-      queue.size -= size
 
       // Update the prefetch counter to reflect that this entry is no longer being prefetched.
       var cur = entry
@@ -642,9 +641,6 @@ class QueueEntry(val queue:Queue, val se
           (cur.browsing ::: cur.competing).foreach { sub => if( sub.prefetched(entry) ) { sub.removePrefetch(entry) } }
         }
         cur = cur.getPrevious
-
-        // Sanity check.. we should always stop before we get to the last entry in the list.
-        assert( cur != null , "illegal prefetch state detected.")
       }
 
       // if rv and lv are both adjacent tombstones, then this merges the rv
@@ -774,6 +770,7 @@ class QueueEntry(val queue:Queue, val se
         flushing = false
         queue.flushingSize-=size
       }
+      queue.size -= size
       super.tombstone
     }
 
@@ -920,27 +917,24 @@ class QueueEntry(val queue:Queue, val se
     // Flushed entries can't be dispatched until
     // they get loaded.
     def dispatch():QueueEntry = {
-      if( !loading ) {
-        var remaining = queue.tune_subscription_buffer - size
-        load
+      if( !loading && hasSubs) {
 
-        // make sure the next few entries are loaded too..
-        var cur = getNext
-        while( remaining>0 && cur!=null ) {
-          remaining -= cur.size
-          val flushed = cur.asFlushed
-          if( flushed!=null && !flushed.loading) {
-            flushed.load
-          }
-          cur = getNext
+        // I don't think this should ever happen as we should be prefetching the
+        // entry before we dispatch it.
+        warn("dispatch called on a flushed entry that is not loading.")
+
+        // ask the subs to fill the prefetches.. that should
+        // kick off a load
+        (browsing ::: competing).foreach { sub =>
+          sub.fillPrefetch
         }
-
       }
       null
     }
 
     override def load() = {
       if( !loading ) {
+//        trace("Start entry load of message seq: %s", seq)
         // start loading it back...
         loading = true
         queue.loadingSize += size
@@ -948,12 +942,15 @@ class QueueEntry(val queue:Queue, val se
           // pass off to a source so it can aggregate multiple
           // loads to reduce cross thread synchronization
           if( delivery.isDefined ) {
+//            debug("Store found message seq: %d", seq)
             queue.store_load_source.merge((this, delivery.get))
           } else {
+
+            debug("Detected store drop of message seq: %d", seq)
+
             // Looks like someone else removed the message from the store.. lets just
             // tombstone this entry now.
             queue.dispatchQueue {
-              debug("Detected store drop of message key: %d", messageKey)
               tombstone
             }
           }
@@ -964,6 +961,7 @@ class QueueEntry(val queue:Queue, val se
 
     def loaded(messageRecord:MessageRecord) = {
       if( loading ) {
+//        debug("Loaded message seq: ", seq )
         loading = false
         queue.loadingSize -= size
 
@@ -974,12 +972,15 @@ class QueueEntry(val queue:Queue, val se
 
         queue.size += size
         state = new Loaded(delivery)
+      } else {
+//        debug("Ignoring store load of: ", messageKey)
       }
     }
 
 
     override def tombstone = {
       if( loading ) {
+//        debug("Tombstoned, will ignore store load of seq: ", seq)
         loading = false
         queue.loadingSize -= size
       }
@@ -1020,6 +1021,7 @@ class Subscription(queue:Queue) extends 
       // setting a new position..
       if( pos!=null ) {
         // Remove the previous pos from the prefetch counters.
+        pos.prefetched -= 1
         removePrefetch(pos)
         cursoredCounter += pos.size
       }
@@ -1046,7 +1048,6 @@ class Subscription(queue:Queue) extends 
   }
 
   def removePrefetch(value:QueueEntry):Unit = {
-    value.prefetched -= 1
     prefetchSize -= value.size
     fillPrefetch()
   }