You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2010/12/06 03:25:48 UTC
svn commit: r1042514 - in /activemq/activemq-apollo/trunk:
apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/
apollo-dto/src/main/java/org/apache/activemq/apollo/dto/
apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/ apol...
Author: chirino
Date: Mon Dec 6 02:25:48 2010
New Revision: 1042514
URL: http://svn.apache.org/viewvc?rev=1042514&view=rev
Log:
- Simplified the prefetching model used in the queues.
Modified:
activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/EntryStatusDTO.java
activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueDTO.java
activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/RuntimeResource.scala
activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/DestinationStatusDTO.jade
activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/QueueStatusDTO.jade
Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=1042514&r1=1042513&r2=1042514&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala Mon Dec 6 02:25:48 2010
@@ -46,7 +46,6 @@ class Queue(val host: VirtualHost, var i
var inbound_sessions = Set[DeliverySession]()
var all_subscriptions = Map[DeliveryConsumer, Subscription]()
- var fast_subscriptions = List[Subscription]()
val filter = binding.message_filter
@@ -83,29 +82,12 @@ class Queue(val host: VirtualHost, var i
/**
* The amount of memory buffer space for receiving messages.
*/
- def tune_producer_buffer = config.producer_buffer.getOrElse(1024*32)
+ def tune_producer_buffer = config.producer_buffer.getOrElse(32*1024)
/**
* The amount of memory buffer space for the queue..
*/
- def tune_queue_buffer = config.queue_buffer.getOrElse(1024*32)
-
- /**
- * Subscribers that consume slower than this rate per seconds will be considered
- * slow. Once a consumer is considered slow, we may switch to disk spooling.
- */
- def tune_slow_subscription_rate = config.slow_subscription_rate.getOrElse(500*1024)
-
- /**
- * The number of milliseconds between slow consumer checks.
- */
- def tune_slow_check_interval = config.slow_check_interval.getOrElse(500L)
-
- /**
- * The number of intervals that a consumer must not meeting the subscription rate before it is
- * flagged as a slow consumer.
- */
- def tune_max_slow_intervals = config.max_slow_intervals.getOrElse(10)
+ def tune_queue_buffer = config.queue_buffer.getOrElse(32*1024)
//
// Frequently accessed tuning configuration.
@@ -140,7 +122,7 @@ class Queue(val host: VirtualHost, var i
tune_persistent = host.store !=null && config.persistent.getOrElse(true)
tune_flush_to_store = tune_persistent && config.flush_to_store.getOrElse(true)
tune_flush_range_size = config.flush_range_size.getOrElse(10000)
- tune_consumer_buffer = config.consumer_buffer.getOrElse(1024*64)
+ tune_consumer_buffer = config.consumer_buffer.getOrElse(1024*128)
}
configure(config)
@@ -161,6 +143,10 @@ class Queue(val host: VirtualHost, var i
var capacity = 0
var capacity_used = 0
+ val swap_source = createSource(EventAggregators.INTEGER_ADD, dispatchQueue)
+ swap_source.setEventHandler(^{ swap_messages });
+ swap_source.resume
+
protected def _start(onCompleted: Runnable) = {
capacity = tune_queue_buffer;
@@ -169,14 +155,14 @@ class Queue(val host: VirtualHost, var i
// by the time this is run, consumers and producers may have already joined.
onCompleted.run
display_stats
- schedual_slow_consumer_check
+ schedual_consumer_sample
// wake up the producers to fill us up...
if (messages.refiller != null) {
messages.refiller.run
}
- // kick of dispatching to the consumers.
- all_subscriptions.valuesIterator.foreach( _.refill_prefetch )
+ // kick off dispatching to the consumers.
+ trigger_swap
dispatchQueue << head_entry
}
@@ -270,17 +256,12 @@ class Queue(val host: VirtualHost, var i
entry.as_loaded.store
}
- def haveQuickConsumer = fast_subscriptions.find( sub=> sub.pos.seq <= entry.seq ).isDefined
-
var dispatched = false
- if( entry.hasSubs || haveQuickConsumer ) {
+ if( entry.hasSubs ) {
// try to dispatch it directly...
entry.dispatch
- } else {
- // we flush the entry out right away if it looks
- // it wont be needed.
- entry.flush(true)
}
+ trigger_swap
// release the store batch...
if (queueDelivery.uow != null) {
@@ -329,141 +310,100 @@ class Queue(val host: VirtualHost, var i
}
}
- def schedual_slow_consumer_check:Unit = {
-
- def slowConsumerCheck = {
- if( serviceState.isStarted ) {
-
- // target tune_min_subscription_rate / sec
- val slow_cursor_delta = (((tune_slow_subscription_rate) * tune_slow_check_interval) / 1000).toInt
- var idleConsumerCount = 0
-
-
- var startedWithFastSubs = !fast_subscriptions.isEmpty
- fast_subscriptions = Nil
+ def trigger_swap = {
+ if( tune_flush_to_store ) {
+ swap_source.merge(1)
+ }
+ }
- all_subscriptions.foreach{ case (consumer, sub)=>
+ def swap_messages = {
- // Skip over new consumers...
- if( sub.advanced_size != 0 ) {
+ // reset the prefetch flags...
+ var cur = entries.getHead
+ while( cur!=null ) {
+ cur.prefetch_flags = 0
+ cur = cur.getNext
+ }
- val cursor_delta = sub.advanced_size - sub.last_advanced_size
- sub.last_advanced_size = sub.advanced_size
+ // Set the prefetch flags
+ all_subscriptions.valuesIterator.foreach( _.refill_prefetch )
- // If the subscription is NOT slow if it's it is tail parked or
- // it's been parking or at least advancing through the data at
- // the tune_slow_subscription_rate
- if( sub.tail_parked || sub.tail_parkings>0 || cursor_delta >= slow_cursor_delta ) {
- if( sub.slow ) {
- debug("subscription is now fast: %s", sub)
- sub.slow_intervals = 0
- }
- } else {
- if( !sub.slow ) {
- debug("slow interval: %d, %d, %d < %d", sub.slow_intervals, sub.tail_parkings, cursor_delta, slow_cursor_delta)
- sub.slow_intervals += 1
- if( sub.slow ) {
- debug("subscription is slow: %s", sub)
- }
- }
- }
+ // swap out messages.
+ cur = entries.getHead
+ while( cur!=null ) {
+ if( cur.is_loaded && cur.prefetch_flags==0 && !cur.as_loaded.acquired ) {
+ val flush_asap = !cur.as_loaded.acquired
+// display_active_entries
+ cur.flush(flush_asap)
+ }
+ cur = cur.getNext
+ }
- // has the consumer been stuck at the tail?
- if( sub.tail_parked && sub.tail_parkings==0 ) {
- idleConsumerCount += 1;
- }
- sub.tail_parkings = 0
- }
+ // Combine flushed items into flushed ranges
+ if( flushed_items > tune_flush_range_size*2 ) {
- if( !sub.slow ) {
- fast_subscriptions ::= sub
- }
- }
+ debug("Looking for flushed entries to combine")
+ var distance_from_sub = tune_flush_range_size;
+ var cur = entries.getHead
+ var combine_counter = 0;
- if (tune_flush_to_store) {
+ while( cur!=null ) {
- // If we no longer have fast subs...
- if( startedWithFastSubs && fast_subscriptions.isEmpty ) {
+ // get the next now.. since cur may get combined and unlinked
+ // from the entry list.
+ val next = cur.getNext
- // flush tail entries that are still loaded but which have no fast subs that can process them.
- var cur = entries.getTail
- while( cur!=null ) {
- def haveQuickConsumer = fast_subscriptions.find( sub=> sub.pos.seq <= cur.seq ).isDefined
- if( cur.is_loaded && !cur.hasSubs && !cur.is_prefetched && !cur.as_loaded.acquired && !haveQuickConsumer ) {
- // then flush out to make space...
- cur.flush(true)
- cur = cur.getPrevious
- } else {
- cur = null
- }
+ if( cur.hasSubs || cur.is_prefetched ) {
+ distance_from_sub = 0
+ } else {
+ distance_from_sub += 1
+ if( cur.can_combine_with_prev ) {
+ cur.getPrevious.as_flushed_range.combineNext
+ } else {
+ if( cur.is_flushed && distance_from_sub > tune_flush_range_size ) {
+ cur.flush_range
}
-
}
+ }
+ cur = next
+ }
+ debug("combined %d entries", combine_counter)
+ }
- // Combine flushed items into flushed ranges
- if( flushed_items > tune_flush_range_size*2 ) {
-
- debug("Looking for flushed entries to combine")
-
- var distance_from_sub = tune_flush_range_size;
- var cur = entries.getHead
- var combine_counter = 0;
-
- while( cur!=null ) {
+ }
- // get the next now.. since cur may get combined and unlinked
- // from the entry list.
- val next = cur.getNext
+ def schedual_consumer_sample:Unit = {
- if( cur.hasSubs || cur.is_prefetched ) {
- distance_from_sub = 0
- } else {
- distance_from_sub += 1
- if( cur.can_combine_with_prev ) {
- cur.getPrevious.as_flushed_range.combineNext
- } else {
- if( cur.is_flushed && distance_from_sub > tune_flush_range_size ) {
- cur.flush_range
- }
- }
+ def slowConsumerCheck = {
+ if( serviceState.isStarted ) {
- }
- cur = next
+ // target tune_min_subscription_rate / sec
+ all_subscriptions.foreach{ case (consumer, sub)=>
+ sub.advanced_sizes += {
+ if( sub.tail_parkings > 0 ) {
+ sub.advanced_size.max(1024*1024*20)
+ } else {
+ sub.advanced_size
}
-
- debug("combined %d entries", combine_counter)
-
}
+ sub.tail_parkings = 0
+ while( sub.advanced_sizes.size > 10 ) {
+ sub.advanced_sizes = sub.advanced_sizes.drop(1)
+ }
+ sub.total_advanced_size += sub.advanced_size
+ sub.advanced_size = 0
-// // Trigger a swap if we have consumers waiting for messages and we are full..
-// if( idleConsumerCount > 0 && messages.full && flushing_size==0 ) {
-//
-// debug("swapping...")
-// var entry = head_entry.getNext
-// while( entry!=null ) {
-// val loaded = entry.as_loaded
-//
-// // Keep around prefetched and loaded entries.
-// if( entry.is_prefetched || (loaded!=null && loaded.acquired)) {
-// entry.load
-// } else {
-// // flush the the others out of memory.
-// entry.flush(true)
-// }
-// entry = entry.getNext
-// }
-//
-// }
}
- schedual_slow_consumer_check
+ swap_messages
+ schedual_consumer_sample
}
}
- dispatchQueue.dispatchAfter(tune_slow_check_interval, TimeUnit.MILLISECONDS, ^{
+ dispatchQueue.dispatchAfter(1, TimeUnit.SECONDS, ^{
slowConsumerCheck
})
}
@@ -548,14 +488,9 @@ class Queue(val host: VirtualHost, var i
def bind(values: List[DeliveryConsumer]) = retaining(values) {
for (consumer <- values) {
- val subscription = if( tune_flush_to_store) {
- new PrefetchingSubscription(this)
- } else {
- new Subscription(this)
- }
+ val subscription = new Subscription(this)
subscription.open(consumer)
all_subscriptions += consumer -> subscription
- fast_subscriptions ::= subscription
addCapacity( tune_consumer_buffer )
}
} >>: dispatchQueue
@@ -565,7 +500,6 @@ class Queue(val host: VirtualHost, var i
all_subscriptions.get(consumer) match {
case Some(subscription) =>
all_subscriptions -= consumer
- fast_subscriptions = fast_subscriptions.filterNot(_ eq subscription)
subscription.close
addCapacity( -tune_consumer_buffer )
case None =>
@@ -638,12 +572,12 @@ class QueueEntry(val queue:Queue, val se
// The number of subscriptions which have requested this entry to be prefeteched (held in memory) so that it's
// ready for them to get dispatched.
- var prefetched = 0
+ var prefetch_flags = 0
// The current state of the entry: Tail | Loaded | Flushed | Tombstone
var state:EntryState = new Tail
- def is_prefetched = prefetched>0
+ def is_prefetched = prefetch_flags == 1
def <(value:QueueEntry) = this.seq < value.seq
def <=(value:QueueEntry) = this.seq <= value.seq
@@ -723,7 +657,7 @@ class QueueEntry(val queue:Queue, val se
}
override def toString = {
- "{seq: "+seq+", prefetched: "+prefetched+", value: "+state+", subscriptions: "+parked+"}"
+ "{seq: "+seq+", prefetch_flags: "+prefetch_flags+", value: "+state+", subscriptions: "+parked+"}"
}
/////////////////////////////////////////////////////
@@ -825,63 +759,17 @@ class QueueEntry(val queue:Queue, val se
def flush_range:Unit = throw new AssertionError("should only be called on flushed entries");
/**
- * Takes the current entry out of the prefetch of all subscriptions
- * which have prefetched the entry. Runs the partial function then
- * refills the prefetch of those subs that were affected.
- */
- def with_prefetch_droped(func: =>Unit ):Unit = {
- if( queue.tune_flush_to_store ) {
-
- // drop the prefetch
- val expected = prefetched
- var prefechingSubs = List[Subscription]()
- if( queue.tune_flush_to_store ) {
- // Update the prefetch counter to reflect that this entry is no longer being prefetched.
- var cur = entry
- while( cur!=null && is_prefetched ) {
- if( cur.hasSubs ) {
- (cur.parked).foreach { case sub:PrefetchingSubscription =>
- if( sub.is_prefetched(entry) ) {
- sub.remove_from_prefetch(entry)
- prefechingSubs ::= sub
- }
- }
- }
- cur = cur.getPrevious
- }
- }
- if(prefetched!=0) {
- assert(prefetched==0, "entry should not be prefetched.")
- }
- assert(expected == prefechingSubs.size, "should get all the subs")
-
- func
-
- // refill the prefetch
- prefechingSubs.foreach{ case sub =>
- sub.refill_prefetch
- }
-
- } else {
- func
- }
- }
-
- /**
* Removes the entry from the queue's linked list of entries. This gets called
* as a result of an aquired ack.
*/
def remove = {
- with_prefetch_droped {
-
- // advance subscriptions that were on this entry..
- advance(parked)
- parked = Nil
-
- // take the entry of the entries list..
- unlink
-
- }
+ // advance subscriptions that were on this entry..
+ advance(parked)
+ parked = Nil
+
+ // take the entry of the entries list..
+ unlink
+ //TODO: perhaps refill subscriptions.
}
/**
@@ -892,6 +780,7 @@ class QueueEntry(val queue:Queue, val se
val nextPos = nextOrTail
nextPos :::= advancing.toList
advancing.foreach(_.advance(nextPos))
+ queue.trigger_swap
}
}
@@ -987,6 +876,7 @@ class QueueEntry(val queue:Queue, val se
if( queue.tune_flush_to_store ) {
if( stored ) {
flushing=true
+ queue.flushing_size+=size
flushed
} else {
if( !flushing ) {
@@ -1033,6 +923,7 @@ class QueueEntry(val queue:Queue, val se
stored = true
delivery.uow = null
if( flushing ) {
+ flushing = false
queue.flushing_size-=size
queue.capacity_used -= size
delivery.message.release
@@ -1136,12 +1027,13 @@ class QueueEntry(val queue:Queue, val se
// the advancing subs move on to the next entry...
advance(advancing)
- // flush this entry out if it's not going to be needed soon.
- def haveQuickConsumer = queue.fast_subscriptions.find( sub=> sub.pos.seq <= seq ).isDefined
- if( !hasSubs && !is_prefetched && !acquired && !haveQuickConsumer ) {
- // then flush out to make space...
- flush(false)
- }
+// // flush this entry out if it's not going to be needed soon.
+// if( !hasSubs && prefetch_flags==0 ) {
+// // then flush out to make space...
+// var flush_asap = !acquired
+// flush(flush_asap)
+// }
+ queue.trigger_swap
return true
}
}
@@ -1184,9 +1076,7 @@ class QueueEntry(val queue:Queue, val se
// pass off to a source so it can aggregate multiple
// loads to reduce cross thread synchronization
if( delivery.isDefined ) {
- queue.dispatchQueue {
- queue.store_load_source.merge((this, delivery.get))
- }
+ queue.store_load_source.merge((this, delivery.get))
} else {
info("Detected store dropped message at seq: %d", seq)
@@ -1306,18 +1196,17 @@ class QueueEntry(val queue:Queue, val se
queue.enqueue_size_counter += size_delta
}
- with_prefetch_droped {
-
- linkAfter(tmpList)
- val next = getNext
+ linkAfter(tmpList)
+ val next = getNext
- // move the subs to the first entry that we just loaded.
- parked.foreach(_.advance(next))
- next :::= parked
+ // move the subs to the first entry that we just loaded.
+ parked.foreach(_.advance(next))
+ next :::= parked
+ queue.trigger_swap
- unlink
+ unlink
- }
+ // TODO: refill prefetches
}
}
}
@@ -1365,20 +1254,23 @@ class Subscription(queue:Queue) extends
var session: DeliverySession = null
var pos:QueueEntry = null
- var advanced_size = 0L
-
- // Vars used to detect slow consumers.
- var last_advanced_size = 0L
- var tail_parkings = 0
- var slow_intervals = 0
-
- def slow = slow_intervals > queue.tune_max_slow_intervals
-
var acquired_size = 0L
+ var total_advanced_size = 0L
+ var advanced_size = 0
+ var advanced_sizes = ListBuffer[Int](1024*1024*20) // use circular buffer instead.
+
+ var tail_parkings = 1
+
+ var best_advanced_size = if(advanced_sizes.isEmpty) {
+ 0
+ } else {
+ advanced_sizes.foldLeft(0)(_ max _)
+ }
+
override def toString = {
def seq(entry:QueueEntry) = if(entry==null) null else entry.seq
- "{ id: "+id+", acquired_size: "+acquired_size+", pos: "+seq(pos)+", tail_parkings: "+tail_parkings+"}"
+ "{ id: "+id+", acquired_size: "+acquired_size+", pos: "+seq(pos)+"}"
}
def browser = session.consumer.browser
@@ -1411,6 +1303,8 @@ class Subscription(queue:Queue) extends
session.refiller = NOOP
session.close
session = null
+
+ queue.trigger_swap
}
/**
@@ -1429,9 +1323,8 @@ class Subscription(queue:Queue) extends
pos = value
session.refiller = pos
- refill_prefetch
if( tail_parked ) {
- tail_parkings += 1
+ tail_parkings += 0
}
}
@@ -1456,7 +1349,37 @@ class Subscription(queue:Queue) extends
def acquire(entry:QueueEntry) = new AcquiredQueueEntry(entry)
- def refill_prefetch = {}
+ def refill_prefetch = {
+
+ var next = if( pos.is_tail ) {
+ null // can't prefetch the tail..
+ } else if( pos.is_head ) {
+ pos.getNext // can't prefetch the head.
+ } else {
+ pos // start prefetching from the current position.
+ }
+
+ var remaining = queue.tune_consumer_buffer - acquired_size
+ while( remaining>0 && next!=null ) {
+ remaining -= next.size
+ next.prefetch_flags |= 1
+ next.load
+ next = next.getNext
+ }
+
+ remaining = if(tail_parkings > 0) {
+ queue.tune_consumer_buffer
+ } else {
+ best_advanced_size
+ }
+
+ while( remaining>0 && next!=null ) {
+ remaining -= next.size
+ next.prefetch_flags |= 2
+ next = next.getNext
+ }
+
+ }
class AcquiredQueueEntry(val entry:QueueEntry) extends LinkedNode[AcquiredQueueEntry] {
@@ -1495,7 +1418,7 @@ class Subscription(queue:Queue) extends
val next = entry.nextOrTail
entry.remove // entry size changes to 0
- refill_prefetch
+ queue.trigger_swap
next.run
}
@@ -1524,119 +1447,3 @@ class Subscription(queue:Queue) extends
}
-/**
- * A subscription which issues message load requests so that messages are prefetched from
- * the store before they are needed for dispatching purposes.
- */
-class PrefetchingSubscription(queue:Queue) extends Subscription(queue) {
-
- var prefetch_head:QueueEntry = null
- var prefetch_tail:QueueEntry = null
- var prefetched_size = 0
-
- override def toString = {
- def seq(entry:QueueEntry) = if(entry==null) null else entry.seq
- "{ id: "+id+", acquired_size: "+acquired_size+", pos: "+seq(pos)+", prefetch_size: "+prefetched_size+", prefetch_head: "+seq(prefetch_head)+", prefetch_tail: "+seq(prefetch_tail)+", tail_parkings: "+tail_parkings+", prefetchFull: "+prefetch_full+"}"
- }
-
-
- override def advance(value:QueueEntry):Unit = {
- super.advance(value)
- refill_prefetch // update the prefetch window.
- }
-
-
- override def rewind(value: QueueEntry) = {
- invalidate_prefetch
- super.rewind(value)
- }
-
-
- override def close() = {
- invalidate_prefetch
- super.close
- }
-
- def prefetch_full = acquired_size + prefetched_size >= queue.tune_consumer_buffer
-
- override def refill_prefetch() = {
-
- // first lets reclaim prefetch space
- while( prefetch_head!=null && prefetch_head < pos ) {
- remove_from_prefetch(prefetch_head)
- }
-
- // now lets fill the prefetch if it has capacity.
- if( !prefetch_full ) {
-
- var next = if(prefetch_tail==null) {
- if( pos.is_tail ) {
- null // can't prefetch the tail..
- } else if( pos.is_head ) {
- pos.getNext // can't prefetch the head.
- } else {
- pos // start prefetching from the current position.
- }
- } else {
- prefetch_tail.getNext // continue prefetching from the last prefetch tail
- }
-
- while( !prefetch_full && next!=null ) {
-
- prefetched_size += next.size
- next.prefetched += 1
- next.load
-
- if( prefetch_head==null ) {
- prefetch_head = next
- }
- prefetch_tail = next
-
- next = next.getNext
- }
- }
- }
-
-
-
- /**
- * Is the specified queue entry prefeteched by this subscription?
- */
- def is_prefetched(value:QueueEntry) = {
- assert(value!=null)
- prefetch_head!=null && prefetch_head <= value && value <= prefetch_tail
- }
-
- def remove_from_prefetch(entry:QueueEntry):Unit = {
- prefetched_size -= entry.size
- entry.prefetched -= 1
-
- if( entry == prefetch_head ) {
- if( entry == prefetch_tail ) {
- prefetch_head = null
- prefetch_tail = null
- assert( prefetched_size == 0 , "inconsistent prefetch size.")
- } else {
- prefetch_head = prefetch_head.getNext
- if( prefetched_size == 0 ) {
- assert( prefetched_size != 0 , "inconsistent prefetch size.")
- }
- }
- } else {
- if( entry == prefetch_tail ) {
- prefetch_tail = prefetch_tail.getPrevious
- }
- if( prefetched_size == 0 ) {
- assert( prefetched_size != 0 , "inconsistent prefetch size.")
- }
- }
- }
-
- def invalidate_prefetch: Unit = {
- while (prefetch_head !=null ) {
- remove_from_prefetch(prefetch_head)
- }
- assert(prefetched_size == 0, "inconsistent prefetch size.")
- }
-
-}
Modified: activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/EntryStatusDTO.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/EntryStatusDTO.java?rev=1042514&r1=1042513&r2=1042514&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/EntryStatusDTO.java (original)
+++ activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/EntryStatusDTO.java Mon Dec 6 02:25:48 2010
@@ -51,7 +51,7 @@ public class EntryStatusDTO {
@XmlAttribute(name = "consumer-count")
public int consumer_count;
- @XmlAttribute(name = "prefetch-count")
- public int prefetch_count;
+ @XmlAttribute(name = "is-prefetched")
+ public boolean is_prefetched;
}
\ No newline at end of file
Modified: activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueDTO.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueDTO.java?rev=1042514&r1=1042513&r2=1042514&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueDTO.java (original)
+++ activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueDTO.java Mon Dec 6 02:25:48 2010
@@ -78,21 +78,6 @@ public class QueueDTO {
public Integer consumer_buffer;
/**
- * Subscribers that consume slower than this rate per seconds will be considered
- * slow. Once a consumer is considered slow, we may switch to disk spooling.
- */
- @XmlAttribute(name="slow-subscription-rate")
- @JsonProperty("slow_subscription_rate")
- public Integer slow_subscription_rate;
-
- /**
- * The number of milliseconds between slow consumer checks.
- */
- @XmlAttribute(name="slow-check-interval")
- @JsonProperty("slow_check_interval")
- public Long slow_check_interval;
-
- /**
* Should this queue persistently store it's entries?
*/
@XmlAttribute(name="persistent")
Modified: activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/RuntimeResource.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/RuntimeResource.scala?rev=1042514&r1=1042513&r2=1042514&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/RuntimeResource.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/RuntimeResource.scala Mon Dec 6 02:25:48 2010
@@ -194,7 +194,7 @@ case class RuntimeResource(parent:Broker
e.count = cur.count
e.size = cur.size
e.consumer_count = cur.parked.size
- e.prefetch_count = cur.prefetched
+ e.is_prefetched = cur.is_prefetched
e.state = cur.label
result.entries.add(e)
Modified: activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/DestinationStatusDTO.jade
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/DestinationStatusDTO.jade?rev=1042514&r1=1042513&r2=1042514&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/DestinationStatusDTO.jade (original)
+++ activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/DestinationStatusDTO.jade Mon Dec 6 02:25:48 2010
@@ -18,7 +18,7 @@
- import helper._
.breadcumbs
- a(href={strip_resolve(".")}) Back
+ a(href={strip_resolve("..")}) Back
h1 Destination: #{name}
Modified: activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/QueueStatusDTO.jade
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/QueueStatusDTO.jade?rev=1042514&r1=1042513&r2=1042514&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/QueueStatusDTO.jade (original)
+++ activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/QueueStatusDTO.jade Mon Dec 6 02:25:48 2010
@@ -33,14 +33,11 @@ p queue size: #{memory(queue_size)}
h2 Enqueue/Deqeueue Counters
-p enqueued: #{enqueue_item_counter} messages
-p enqueued: #{memory(enqueue_size_counter)}
+p enqueued: #{enqueue_item_counter} messages (#{memory(enqueue_size_counter)})
-p dequeued: #{dequeue_item_counter} messages
-p dequeued: #{memory(dequeue_size_counter)}
+p dequeued: #{dequeue_item_counter} messages (#{memory(dequeue_size_counter)})
-p nacked: #{nack_item_counter} messages
-p nacked: #{memory(nack_size_counter)}
+p nacked: #{nack_item_counter} messages (#{memory(nack_size_counter)})
h2 Swap Status
@@ -80,6 +77,6 @@ ul
tr
td #{x.state}
td #{memory(x.size)}
- td #{x.consumer_count}, #{x.prefetch_count}
+ td #{x.consumer_count}, #{x.is_prefetched}
td #{x.seq}:#{x.count}