You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2012/02/08 16:06:34 UTC
svn commit: r1241924 -
/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
Author: chirino
Date: Wed Feb 8 15:06:33 2012
New Revision: 1241924
URL: http://svn.apache.org/viewvc?rev=1241924&view=rev
Log:
Don't combine swap entries while they are loading. Fixes unlink errors.
Modified:
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=1241924&r1=1241923&r2=1241924&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala Wed Feb 8 15:06:33 2012
@@ -1232,6 +1232,11 @@ class QueueEntry(val queue:Queue, val se
def is_swapped = as_swapped!=null
def is_swapped_range = as_swapped_range!=null
def is_swapped_or_swapped_range = is_swapped || is_swapped_range
+ def is_loading = state match {
+ case state:SwappedRange => state.loading
+ case state:Swapped => state.loading
+ case _ => false
+ }
// These should not change the current state.
def count = state.count
@@ -1256,7 +1261,7 @@ class QueueEntry(val queue:Queue, val se
getPrevious !=null &&
getPrevious.is_swapped_range &&
( (is_swapped && !is_acquired) || is_swapped_range ) &&
- (getPrevious.count + count < queue.tune_swap_range_size)
+ (getPrevious.count + count < queue.tune_swap_range_size) && !is_loading
}
trait EntryState {
@@ -1737,6 +1742,8 @@ class QueueEntry(val queue:Queue, val se
override def toString = { "swapped:{ swapping_in: "+space+", acquired:"+acquirer+", size:"+size+"}" }
+ def loading = this.space!=null
+
override def swap_in(mem_space:MemorySpace) = {
if( this.space==null ) {
// trace("Start entry load of message seq: %s", seq)
@@ -1887,64 +1894,61 @@ class QueueEntry(val queue:Queue, val se
override def size = _size
override def expiration = _expiration
- var swapping_in = false
+ var loading = false
override def as_swapped_range = this
override def is_swapped_or_swapping_out = true
+
def label = {
var rc = "swapped_range"
- if( swapping_in ) {
+ if( loading ) {
rc = "swapped_range|swapping in"
}
rc
}
- override def toString = { "swapped_range:{ swapping_in: "+swapping_in+", count: "+count+", size: "+size+"}" }
+ override def toString = { "swapped_range:{ swapping_in: "+loading+", count: "+count+", size: "+size+"}" }
override def swap_in(space:MemorySpace) = {
- if( !swapping_in ) {
- swapping_in = true
+ if( !loading ) {
+ loading = true
queue.virtual_host.store.list_queue_entries(queue.store_id, seq, last) { records =>
- if( !records.isEmpty ) {
- queue.dispatch_queue {
-
- var item_count=0
- var size_count=0
-
- val tmpList = new LinkedNodeList[QueueEntry]()
- records.foreach { record =>
- val entry = new QueueEntry(queue, record.entry_seq).init(record)
- tmpList.addLast(entry)
- item_count += 1
- size_count += record.size
- }
-
- // we may need to adjust the enqueue count if entries
- // were dropped at the store level
- var item_delta = (count - item_count)
- val size_delta: Int = size - size_count
-
- if ( item_delta!=0 || size_delta!=0 ) {
- info("Detected store change in range %d to %d. %d message(s) and %d bytes", seq, last, item_delta, size_delta)
- queue.enqueue_item_counter += item_delta
- queue.enqueue_size_counter += size_delta
- }
+ queue.dispatch_queue {
+ loading = false
+ assert(isLinked)
+
+ var item_count=0
+ var size_count=0
+
+ val tmpList = new LinkedNodeList[QueueEntry]()
+ records.foreach { record =>
+ val entry = new QueueEntry(queue, record.entry_seq).init(record)
+ tmpList.addLast(entry)
+ item_count += 1
+ size_count += record.size
+ }
- linkAfter(tmpList)
- val next = getNext
+ // we may need to adjust the enqueue count if entries
+ // were dropped at the store level
+ var item_delta = (count - item_count)
+ val size_delta: Int = size - size_count
+
+ if ( item_delta!=0 || size_delta!=0 ) {
+ info("Detected store change in range %d to %d. %d message(s) and %d bytes", seq, last, item_delta, size_delta)
+ queue.enqueue_item_counter += item_delta
+ queue.enqueue_size_counter += size_delta
+ }
- // move the subs to the first entry that we just loaded.
- parked.foreach(_.advance(next))
- next :::= parked
- queue.trigger_swap
+ linkAfter(tmpList)
+ val next = getNext
- unlink
+ // move the subs to the first entry that we just loaded.
+ parked.foreach(_.advance(next))
+ next :::= parked
+ queue.trigger_swap
- // TODO: refill prefetches
- }
- } else {
- warn("range load failed")
+ unlink
}
}
}
@@ -1958,6 +1962,7 @@ class QueueEntry(val queue:Queue, val se
assert(value!=null)
assert(value.is_swapped || value.is_swapped_range)
assert(!value.is_acquired)
+ assert(!value.is_loading)
if( value.is_swapped ) {
assert(last < value.seq )
last = value.seq