You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2011/06/09 04:38:13 UTC

svn commit: r1133634 - /activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala

Author: chirino
Date: Thu Jun  9 02:38:13 2011
New Revision: 1133634

URL: http://svn.apache.org/viewvc?rev=1133634&view=rev
Log:
Implementing better flow control.  We were removing messages from the swapped in counters too soon, they have to be in the counters while they are being stored to disk.

Modified:
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=1133634&r1=1133633&r2=1133634&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala Thu Jun  9 02:38:13 2011
@@ -840,7 +840,7 @@ class QueueEntry(val queue:Queue, val se
      * Removes the entry from the queue's linked list of entries.  This gets called
      * as a result of an aquired ack.
      */
-    def remove = {
+    def remove:Unit = {
       // advance subscriptions that were on this entry..
       advance(parked)
       parked = Nil
@@ -919,6 +919,7 @@ class QueueEntry(val queue:Queue, val se
 
     var acquired = false
     var swapping_out = false
+    var storing = false
 
     def label = {
       var rc = "loaded"
@@ -936,6 +937,7 @@ class QueueEntry(val queue:Queue, val se
     override def count = 1
     override def size = delivery.size
     override def message_key = delivery.storeKey
+    var remove_pending = false
 
     override def is_swapped_or_swapping_out = {
       swapping_out
@@ -944,53 +946,50 @@ class QueueEntry(val queue:Queue, val se
     override  def as_loaded = this
 
     def store = {
-      delivery.uow.enqueue(toQueueEntryRecord)
-      delivery.uow.on_complete {
-        queue.swap_out_completes_source.merge(this)
+      if(!storing) {
+        storing = true
+        delivery.uow.enqueue(toQueueEntryRecord)
+        delivery.uow.on_complete {
+          queue.swap_out_completes_source.merge(this)
+        }
       }
     }
 
     override def swap_out(asap:Boolean) = {
-      if( queue.tune_swap ) {
+      if( queue.tune_swap && !swapping_out ) {
+        swapping_out=true
+
+        queue.swapping_out_size+=size
         if( stored ) {
-          swapping_out=true
-          queue.swapping_out_size+=size
           swapped_out
         } else {
-          if( !swapping_out ) {
-            swapping_out=true
-            queue.swapping_out_size+=size
 
-            // The storeBatch is only set when called from the messages.offer method
-            if( delivery.uow!=null ) {
+          // The storeBatch is only set when called from the messages.offer method
+          if( delivery.uow!=null ) {
+            if( asap ) {
+              delivery.uow.complete_asap
+            }
+          } else {
+
+            // Are we swapping out a non-persistent message?
+            if( !storing ) {
+              assert( delivery.storeKey == -1 )
+
+              delivery.uow = queue.virtual_host.store.create_uow
+              val uow = delivery.uow
+              delivery.storeKey = uow.store(delivery.createMessageRecord)
+              store
               if( asap ) {
-                delivery.uow.complete_asap
+                uow.complete_asap
               }
-            } else {
+              uow.release
+              delivery.uow = null
 
-              // Are swapping out a non-persistent message?
-              if( delivery.storeKey == -1 ) {
-                
-                delivery.uow = queue.virtual_host.store.create_uow
-                val uow = delivery.uow
-                delivery.storeKey = uow.store(delivery.createMessageRecord)
-                store
-                if( asap ) {
-                  uow.complete_asap
-                }
-                uow.release
-                delivery.uow = null
-
-              } else {
-                  
-                if( asap ) {
-                  queue.virtual_host.store.flush_message(message_key) {
-                    queue.swap_out_completes_source.merge(this)
-                  }
+            } else {
+              if( asap ) {
+                queue.virtual_host.store.flush_message(message_key) {
                 }
-
               }
-
             }
           }
         }
@@ -998,6 +997,8 @@ class QueueEntry(val queue:Queue, val se
     }
 
     def swapped_out() = {
+      assert( state == this )
+      storing = false
       stored = true
       delivery.uow = null
       if( swapping_out ) {
@@ -1013,6 +1014,16 @@ class QueueEntry(val queue:Queue, val se
         if( can_combine_with_prev ) {
           getPrevious.as_swapped_range.combineNext
         }
+        if( remove_pending ) {
+          state.remove
+        }
+      } else {
+        if( remove_pending ) {
+          delivery.message.release
+          queue.swapped_in_size -= size
+          queue.swapped_in_items -= 1
+          super.remove
+        }
       }
     }
 
@@ -1024,14 +1035,14 @@ class QueueEntry(val queue:Queue, val se
     }
 
     override def remove = {
-      if( swapping_out ) {
-        swapping_out = false
-        queue.swapping_out_size-=size
+      if( storing | remove_pending ) {
+        remove_pending = true
+      } else {
+        delivery.message.release
+        queue.swapped_in_size -= size
+        queue.swapped_in_items -= 1
+        super.remove
       }
-      delivery.message.release
-      queue.swapped_in_size -= size
-      queue.swapped_in_items -= 1
-      super.remove
     }
 
     override def dispatch():Boolean = {