You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2011/06/09 04:38:13 UTC
svn commit: r1133634 -
/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
Author: chirino
Date: Thu Jun 9 02:38:13 2011
New Revision: 1133634
URL: http://svn.apache.org/viewvc?rev=1133634&view=rev
Log:
Implementing better flow control. We were removing messages from the swapped in counters too soon, they have to be in the counters while they are being stored to disk.
Modified:
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=1133634&r1=1133633&r2=1133634&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala Thu Jun 9 02:38:13 2011
@@ -840,7 +840,7 @@ class QueueEntry(val queue:Queue, val se
* Removes the entry from the queue's linked list of entries. This gets called
* as a result of an aquired ack.
*/
- def remove = {
+ def remove:Unit = {
// advance subscriptions that were on this entry..
advance(parked)
parked = Nil
@@ -919,6 +919,7 @@ class QueueEntry(val queue:Queue, val se
var acquired = false
var swapping_out = false
+ var storing = false
def label = {
var rc = "loaded"
@@ -936,6 +937,7 @@ class QueueEntry(val queue:Queue, val se
override def count = 1
override def size = delivery.size
override def message_key = delivery.storeKey
+ var remove_pending = false
override def is_swapped_or_swapping_out = {
swapping_out
@@ -944,53 +946,50 @@ class QueueEntry(val queue:Queue, val se
override def as_loaded = this
def store = {
- delivery.uow.enqueue(toQueueEntryRecord)
- delivery.uow.on_complete {
- queue.swap_out_completes_source.merge(this)
+ if(!storing) {
+ storing = true
+ delivery.uow.enqueue(toQueueEntryRecord)
+ delivery.uow.on_complete {
+ queue.swap_out_completes_source.merge(this)
+ }
}
}
override def swap_out(asap:Boolean) = {
- if( queue.tune_swap ) {
+ if( queue.tune_swap && !swapping_out ) {
+ swapping_out=true
+
+ queue.swapping_out_size+=size
if( stored ) {
- swapping_out=true
- queue.swapping_out_size+=size
swapped_out
} else {
- if( !swapping_out ) {
- swapping_out=true
- queue.swapping_out_size+=size
- // The storeBatch is only set when called from the messages.offer method
- if( delivery.uow!=null ) {
+ // The storeBatch is only set when called from the messages.offer method
+ if( delivery.uow!=null ) {
+ if( asap ) {
+ delivery.uow.complete_asap
+ }
+ } else {
+
+ // Are we swapping out a non-persistent message?
+ if( !storing ) {
+ assert( delivery.storeKey == -1 )
+
+ delivery.uow = queue.virtual_host.store.create_uow
+ val uow = delivery.uow
+ delivery.storeKey = uow.store(delivery.createMessageRecord)
+ store
if( asap ) {
- delivery.uow.complete_asap
+ uow.complete_asap
}
- } else {
+ uow.release
+ delivery.uow = null
- // Are swapping out a non-persistent message?
- if( delivery.storeKey == -1 ) {
-
- delivery.uow = queue.virtual_host.store.create_uow
- val uow = delivery.uow
- delivery.storeKey = uow.store(delivery.createMessageRecord)
- store
- if( asap ) {
- uow.complete_asap
- }
- uow.release
- delivery.uow = null
-
- } else {
-
- if( asap ) {
- queue.virtual_host.store.flush_message(message_key) {
- queue.swap_out_completes_source.merge(this)
- }
+ } else {
+ if( asap ) {
+ queue.virtual_host.store.flush_message(message_key) {
}
-
}
-
}
}
}
@@ -998,6 +997,8 @@ class QueueEntry(val queue:Queue, val se
}
def swapped_out() = {
+ assert( state == this )
+ storing = false
stored = true
delivery.uow = null
if( swapping_out ) {
@@ -1013,6 +1014,16 @@ class QueueEntry(val queue:Queue, val se
if( can_combine_with_prev ) {
getPrevious.as_swapped_range.combineNext
}
+ if( remove_pending ) {
+ state.remove
+ }
+ } else {
+ if( remove_pending ) {
+ delivery.message.release
+ queue.swapped_in_size -= size
+ queue.swapped_in_items -= 1
+ super.remove
+ }
}
}
@@ -1024,14 +1035,14 @@ class QueueEntry(val queue:Queue, val se
}
override def remove = {
- if( swapping_out ) {
- swapping_out = false
- queue.swapping_out_size-=size
+ if( storing | remove_pending ) {
+ remove_pending = true
+ } else {
+ delivery.message.release
+ queue.swapped_in_size -= size
+ queue.swapped_in_items -= 1
+ super.remove
}
- delivery.message.release
- queue.swapped_in_size -= size
- queue.swapped_in_items -= 1
- super.remove
}
override def dispatch():Boolean = {