You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2010/07/07 06:13:40 UTC

svn commit: r961172 - /activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala

Author: chirino
Date: Wed Jul  7 04:13:40 2010
New Revision: 961172

URL: http://svn.apache.org/viewvc?rev=961172&view=rev
Log:
Fixed dispatching problems /w messages larger than the consumer prefetch window.

Modified:
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=961172&r1=961171&r2=961172&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala Wed Jul  7 04:13:40 2010
@@ -308,7 +308,7 @@ class Queue(val host: VirtualHost, val d
         check_counter += 1
 
         if( (check_counter%10)==0  ) {
-          display_stats
+//          display_stats
         }
 
         if( (check_counter%25)==0 ) {
@@ -985,7 +985,9 @@ class QueueEntry(val queue:Queue, val se
               // advance: not interested.
               advancing += sub
             } else {
-              if( sub.full ) {
+
+              // Is the sub flow controlled?
+              if( sub.full || (sub.prefetchFull && !sub.is_prefetched(entry) ) ) {
                 // hold back: flow controlled
                 heldBack += sub
               } else {
@@ -1009,13 +1011,7 @@ class QueueEntry(val queue:Queue, val se
       // The acquiring sub is added last to the list so that
       // the other competing subs get first dibs at the next entry.
       if( acquiringSub != null ) {
-
-        // Advancing may need to be held back because the sub's prefetch is full
-        if( acquiringSub.prefetchFull && !acquiringSub.is_prefetched(getNext) ) {
-          heldBack += acquiringSub
-        } else {
-          advancing += acquiringSub
-        }
+        advancing += acquiringSub
       }
 
       if ( advancing.isEmpty ) {
@@ -1233,7 +1229,7 @@ class Subscription(queue:Queue) extends 
   }
 
   def close() = {
-    pos.-=(this)
+    pos -= this
 
     invalidate_prefetch
 
@@ -1249,11 +1245,6 @@ class Subscription(queue:Queue) extends 
       next = next.getNext
       cur.nack // this unlinks the entry.
     }
-
-    // show the queue entries... after we disconnect.
-    queue.dispatchQueue.dispatchAfter(1, TimeUnit.SECONDS, ^{
-      queue.display_active_entries
-    })
   }
 
   /**
@@ -1262,11 +1253,14 @@ class Subscription(queue:Queue) extends 
    */
   def advance(value:QueueEntry):Unit = {
     assert(value!=null)
+    assert(pos!=null) 
 
     // Remove the previous pos from the prefetch counters.
     if( prefetch_tail!=null && !pos.is_head) {
       remove_from_prefetch(pos)
     }
+
+
     advanced_size += pos.size
 
     pos = value
@@ -1399,8 +1393,12 @@ class Subscription(queue:Queue) extends 
 
       // we may now be able to prefetch some messages..
       acquired_size -= entry.size
+
+      val next = entry.nextOrTail
       entry.remove // entry size changes to 0
+
       refill_prefetch
+      next.run
     }
 
     def nack = {