You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2010/07/07 06:14:50 UTC
svn commit: r961185 -
/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
Author: chirino
Date: Wed Jul 7 04:14:50 2010
New Revision: 961185
URL: http://svn.apache.org/viewvc?rev=961185&view=rev
Log:
combine flushed queue entries to reduce memory usage.
Modified:
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=961185&r1=961184&r2=961185&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala Wed Jul 7 04:14:50 2010
@@ -94,10 +94,6 @@ class Queue(val host: VirtualHost, val d
var tail_entry = new QueueEntry(this, next_message_seq).tail
entries.addFirst(head_entry)
- var loading_size = 0
- var flushing_size = 0
-
-
//
// Tuning options.
//
@@ -140,7 +136,7 @@ class Queue(val host: VirtualHost, val d
* reference pointers to the actual messages. When not loaded,
* the batch is referenced as sequence range to conserve memory.
*/
- def tune_entry_group_size = 10000
+ def tune_flush_range_size = 10000
/**
* The number of intervals that a consumer must not meeting the subscription rate before it is
@@ -155,11 +151,13 @@ class Queue(val host: VirtualHost, val d
var nack_item_counter = 0L
var nack_size_counter = 0L
- var flushed_items = 0L
-
def queue_size = enqueue_size_counter - dequeue_size_counter
def queue_items = enqueue_item_counter - dequeue_item_counter
+ var loading_size = 0
+ var flushing_size = 0
+ var flushed_items = 0
+
private var capacity = tune_producer_buffer
var size = 0
@@ -181,7 +179,7 @@ class Queue(val host: VirtualHost, val d
}
if( tune_persistent ) {
- host.store.listQueueEntryRanges(queueKey, tune_entry_group_size) { ranges=>
+ host.store.listQueueEntryRanges(queueKey, tune_flush_range_size) { ranges=>
dispatchQueue {
if( !ranges.isEmpty ) {
@@ -276,15 +274,15 @@ class Queue(val host: VirtualHost, val d
var total_items = 0L
var total_size = 0L
while (cur != null) {
- if (cur.is_loaded || cur.hasSubs || cur.is_prefetched || cur.is_flushed_group ) {
+ if (cur.is_loaded || cur.hasSubs || cur.is_prefetched || cur.is_flushed_range ) {
info(" => " + cur)
}
total_size += cur.size
if (cur.is_flushed || cur.is_loaded) {
total_items += 1
- } else if (cur.is_flushed_group ) {
- total_items += cur.as_flushed_group.count
+ } else if (cur.is_flushed_range ) {
+ total_items += cur.as_flushed_range.count
}
cur = cur.getNext
@@ -304,19 +302,16 @@ class Queue(val host: VirtualHost, val d
def schedual_slow_consumer_check:Unit = {
def slowConsumerCheck = {
- if( retained > 0 ) {
+ if( serviceState.isStarted ) {
// Handy for periodically looking at the dispatch state...
check_counter += 1
- if( (check_counter%10)==0 ) {
-// display_stats
- }
-
if( (check_counter%25)==0 ) {
-// if (!all_subscriptions.isEmpty) {
-// display_active_entries
-// }
+ display_stats
+ if (!all_subscriptions.isEmpty) {
+ display_active_entries
+ }
}
// target tune_min_subscription_rate / sec
@@ -366,38 +361,83 @@ class Queue(val host: VirtualHost, val d
}
- // If we no longer have fast subs...
- if( startedWithFastSubs && fast_subscriptions.isEmpty ) {
- // Flush out the tail entries..
- var cur = entries.getTail
- while( cur!=null ) {
- if( !cur.hasSubs && !cur.is_prefetched ) {
- cur
+ if (tune_flush_to_store) {
+
+ // If we no longer have fast subs...
+ if( startedWithFastSubs && fast_subscriptions.isEmpty ) {
+
+ // flush tail entries that are still loaded but which have no fast subs that can process them.
+ var cur = entries.getTail
+ while( cur!=null ) {
+ def haveQuickConsumer = fast_subscriptions.find( sub=> sub.pos.seq <= cur.seq ).isDefined
+ if( cur.is_loaded && !cur.hasSubs && !cur.is_prefetched && !cur.as_loaded.acquired && !haveQuickConsumer ) {
+ // then flush out to make space...
+ cur.flush(true)
+ cur = cur.getPrevious
+ } else {
+ cur = null
+ }
}
- cur = cur.getPrevious
+
}
- }
+ // Combine flushed items into flushed ranges
+ if( flushed_items > tune_flush_range_size*2 ) {
- // flush tail entries that are still loaded but which have no fast subs that can process them.
- var cur = entries.getTail
- while( cur!=null ) {
- def haveQuickConsumer = fast_subscriptions.find( sub=> sub.pos.seq <= cur.seq ).isDefined
- if( cur.is_loaded && !cur.hasSubs && !cur.is_prefetched && !cur.as_loaded.acquired && !haveQuickConsumer ) {
- // then flush out to make space...
- cur.flush(true)
- cur = cur.getPrevious
- } else {
- cur = null
- }
- }
+ println("Looking for flushed entries to combine")
+
+ var distance_from_sub = tune_flush_range_size;
+ var cur = entries.getHead
+ var combine_counter = 0;
+
+ while( cur!=null ) {
+
+ // get the next now.. since cur may get combined and unlinked
+ // from the entry list.
+ val next = cur.getNext
+
+ if( cur.hasSubs || cur.is_prefetched ) {
+ distance_from_sub = 0
+ } else {
+ distance_from_sub += 1
+ if( cur.can_combine_with_prev ) {
+ cur.getPrevious.as_flushed_range.combineNext
+ } else {
+ if( cur.is_flushed && distance_from_sub > tune_flush_range_size ) {
+ cur.flush_range
+ }
+ }
+ }
+ cur = next
+ }
- // Trigger a swap if we have consumers waiting for messages and we are full..
- if( idleConsumerCount > 0 && messages.full && flushing_size==0 ) {
- swap
+ println("combined "+combine_counter+" entries")
+
+ }
+
+// // Trigger a swap if we have consumers waiting for messages and we are full..
+// if( idleConsumerCount > 0 && messages.full && flushing_size==0 ) {
+//
+// debug("swapping...")
+// var entry = head_entry.getNext
+// while( entry!=null ) {
+// val loaded = entry.as_loaded
+//
+// // Keep around prefetched and loaded entries.
+// if( entry.is_prefetched || (loaded!=null && loaded.acquired)) {
+// entry.load
+// } else {
+// // flush the the others out of memory.
+// entry.flush(true)
+// }
+// entry = entry.getNext
+// }
+//
+// }
}
+
schedual_slow_consumer_check
}
}
@@ -519,35 +559,6 @@ class Queue(val host: VirtualHost, val d
rc
}
- /**
- * Prioritizes all the queue entries so that entries most likely to be consumed
- * next are a higher priority. All messages with the highest priority are loaded
- * and messages with the lowest priority are flushed to make room to accept more
- * messages from the producer.
- */
- def swap():Unit = {
- if( !host.serviceState.isStarted ) {
- return
- }
-
- debug("swapping...")
-
- var entry = head_entry.getNext
- while( entry!=null ) {
- val loaded = entry.as_loaded
-
- // Keep around prefetched and loaded entries.
- if( entry.is_prefetched || (loaded!=null && loaded.acquired)) {
- entry.load
- } else {
- // flush the the others out of memory.
- entry.flush(true)
- }
- entry = entry.getNext
- }
- }
-
-
val store_flush_source = createSource(new ListEventAggregator[QueueEntry#Loaded](), dispatchQueue)
store_flush_source.setEventHandler(^ {drain_store_flushes});
store_flush_source.resume
@@ -700,7 +711,7 @@ class QueueEntry(val queue:Queue, val se
def as_tail = state.as_tail
def as_flushed = state.as_flushed
- def as_flushed_group = state.as_flushed_group
+ def as_flushed_range = state.as_flushed_range
def as_loaded = state.as_loaded
def is_tail = this == queue.tail_entry
@@ -708,9 +719,10 @@ class QueueEntry(val queue:Queue, val se
def is_loaded = as_loaded!=null
def is_flushed = as_flushed!=null
- def is_flushed_group = as_flushed_group!=null
+ def is_flushed_range = as_flushed_range!=null
// These should not change the current state.
+ def count = state.count
def size = state.size
def messageKey = state.messageKey
def is_flushed_or_flushing = state.is_flushed_or_flushing
@@ -721,6 +733,15 @@ class QueueEntry(val queue:Queue, val se
def load = state.load
def remove = state.remove
+ def flush_range = state.flush_range
+
+ def can_combine_with_prev = {
+ getPrevious !=null &&
+ getPrevious.is_flushed_range &&
+ ( is_flushed || is_flushed_range ) &&
+ (getPrevious.count + count < queue.tune_flush_range_size)
+ }
+
trait EntryState {
final def entry:QueueEntry = QueueEntry.this
@@ -728,7 +749,7 @@ class QueueEntry(val queue:Queue, val se
def as_tail:Tail = null
def as_loaded:Loaded = null
def as_flushed:Flushed = null
- def as_flushed_group:FlushedRange = null
+ def as_flushed_range:FlushedRange = null
def as_head:Head = null
/**
@@ -737,6 +758,11 @@ class QueueEntry(val queue:Queue, val se
def size = 0
/**
+ * Gets number of messages that this entry represents
+ */
+ def count = 0
+
+ /**
* Gets the message key for the entry.
* @returns -1 if it is not known.
*/
@@ -763,6 +789,8 @@ class QueueEntry(val queue:Queue, val se
*/
def flush(asap:Boolean) = {}
+ def flush_range:Unit = throw new AssertionError("should only be called on flushed entries");
+
/**
* Takes the current entry out of the prefetch of all subscriptions
* which have prefetched the entry. Runs the partial function then
@@ -893,6 +921,7 @@ class QueueEntry(val queue:Queue, val se
override def toString = { "loaded:{ stored: "+stored+", flushing: "+flushing+", acquired: "+acquired+", size:"+size+"}" }
+ override def count = 1
override def size = delivery.size
override def messageKey = delivery.storeKey
@@ -962,6 +991,10 @@ class QueueEntry(val queue:Queue, val se
queue.flushing_size-=size
queue.size -= size
state = new Flushed(delivery.storeKey, size)
+
+ if( can_combine_with_prev ) {
+ getPrevious.as_flushed_range.combineNext
+ }
}
}
@@ -1074,8 +1107,12 @@ class QueueEntry(val queue:Queue, val se
*/
class Flushed(override val messageKey:Long, override val size:Int) extends EntryState {
+ queue.flushed_items += 1
+
var loading = false
+ override def count = 1
+
override def as_flushed = this
override def is_flushed_or_flushing = true
@@ -1121,6 +1158,7 @@ class QueueEntry(val queue:Queue, val se
delivery.storeKey = messageRecord.key
queue.size += size
+ queue.flushed_items -= 1
state = new Loaded(delivery, true)
} else {
// debug("Ignoring store load of: ", messageKey)
@@ -1133,8 +1171,18 @@ class QueueEntry(val queue:Queue, val se
loading = false
queue.loading_size -= size
}
+ queue.flushed_items -= 1
super.remove
}
+
+ override def flush_range = {
+ if( loading ) {
+ loading = false
+ queue.loading_size -= size
+ }
+ queue.flushed_items -= 1
+ state = new FlushedRange(seq, 1, size)
+ }
}
/**
@@ -1151,13 +1199,16 @@ class QueueEntry(val queue:Queue, val se
/** the last seq id in the range */
var last:Long,
/** the number of items in the range */
- var count:Int,
+ var _count:Int,
/** size in bytes of the range */
- override val size:Int) extends EntryState {
+ var _size:Int) extends EntryState {
+
+ override def count = _count
+ override def size = _size
var loading = false
- override def as_flushed_group = this
+ override def as_flushed_range = this
override def is_flushed_or_flushing = true
@@ -1210,9 +1261,28 @@ class QueueEntry(val queue:Queue, val se
}
}
- override def remove = {
- throw new AssertionError("Flushed range cannbot be removed.");
+ /**
+ * Combines this queue entry with the next queue entry.
+ */
+ def combineNext():Unit = {
+ val value = getNext
+ assert(value!=null)
+ assert(value.is_flushed || value.is_flushed_range)
+ if( value.is_flushed ) {
+ assert(last < value.seq )
+ last = value.seq
+ _count += 1
+ _size += value.size
+ value.remove
+ } else if( value.is_flushed_range ) {
+ assert(last < value.seq )
+ last = value.as_flushed_range.last
+ _count += value.as_flushed_range.count
+ _size += value.size
+ value.remove
+ }
}
+
}
@@ -1476,13 +1546,17 @@ class PrefetchingSubscription(queue:Queu
assert( prefetched_size == 0 , "inconsistent prefetch size.")
} else {
prefetch_head = prefetch_head.getNext
- assert( prefetched_size != 0 , "inconsistent prefetch size.")
+ if( prefetched_size == 0 ) {
+ assert( prefetched_size != 0 , "inconsistent prefetch size.")
+ }
}
} else {
if( entry == prefetch_tail ) {
prefetch_tail = prefetch_tail.getPrevious
}
- assert( prefetched_size != 0 , "inconsistent prefetch size.")
+ if( prefetched_size == 0 ) {
+ assert( prefetched_size != 0 , "inconsistent prefetch size.")
+ }
}
}