You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2010/12/06 03:25:48 UTC

svn commit: r1042514 - in /activemq/activemq-apollo/trunk: apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/ apollo-dto/src/main/java/org/apache/activemq/apollo/dto/ apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/ apol...

Author: chirino
Date: Mon Dec  6 02:25:48 2010
New Revision: 1042514

URL: http://svn.apache.org/viewvc?rev=1042514&view=rev
Log:
- Simplified the prefetching model used in the queues.

Modified:
    activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
    activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/EntryStatusDTO.java
    activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueDTO.java
    activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/RuntimeResource.scala
    activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/DestinationStatusDTO.jade
    activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/QueueStatusDTO.jade

Modified: activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=1042514&r1=1042513&r2=1042514&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala Mon Dec  6 02:25:48 2010
@@ -46,7 +46,6 @@ class Queue(val host: VirtualHost, var i
 
   var inbound_sessions = Set[DeliverySession]()
   var all_subscriptions = Map[DeliveryConsumer, Subscription]()
-  var fast_subscriptions = List[Subscription]()
 
   val filter = binding.message_filter
 
@@ -83,29 +82,12 @@ class Queue(val host: VirtualHost, var i
   /**
    *  The amount of memory buffer space for receiving messages.
    */
-  def tune_producer_buffer = config.producer_buffer.getOrElse(1024*32)
+  def tune_producer_buffer = config.producer_buffer.getOrElse(32*1024)
 
   /**
    *  The amount of memory buffer space for the queue..
    */
-  def tune_queue_buffer = config.queue_buffer.getOrElse(1024*32)
-
-  /**
-   * Subscribers that consume slower than this rate per seconds will be considered
-   * slow.  Once a consumer is considered slow, we may switch to disk spooling.
-   */
-  def tune_slow_subscription_rate = config.slow_subscription_rate.getOrElse(500*1024)
-
-  /**
-   * The number of milliseconds between slow consumer checks.
-   */
-  def tune_slow_check_interval = config.slow_check_interval.getOrElse(500L)
-
-  /**
-   * The number of intervals that a consumer must not meeting the subscription rate before it is
-   * flagged as a slow consumer.
-   */
-  def tune_max_slow_intervals = config.max_slow_intervals.getOrElse(10)
+  def tune_queue_buffer = config.queue_buffer.getOrElse(32*1024)
 
   //
   // Frequently accessed tuning configuration.
@@ -140,7 +122,7 @@ class Queue(val host: VirtualHost, var i
     tune_persistent = host.store !=null && config.persistent.getOrElse(true)
     tune_flush_to_store = tune_persistent && config.flush_to_store.getOrElse(true)
     tune_flush_range_size = config.flush_range_size.getOrElse(10000)
-    tune_consumer_buffer = config.consumer_buffer.getOrElse(1024*64)
+    tune_consumer_buffer = config.consumer_buffer.getOrElse(1024*128)
   }
   configure(config)
 
@@ -161,6 +143,10 @@ class Queue(val host: VirtualHost, var i
   var capacity = 0
   var capacity_used = 0
 
+  val swap_source = createSource(EventAggregators.INTEGER_ADD, dispatchQueue)
+  swap_source.setEventHandler(^{ swap_messages });
+  swap_source.resume
+
   protected def _start(onCompleted: Runnable) = {
 
     capacity = tune_queue_buffer;
@@ -169,14 +155,14 @@ class Queue(val host: VirtualHost, var i
       // by the time this is run, consumers and producers may have already joined.
       onCompleted.run
       display_stats
-      schedual_slow_consumer_check
+      schedual_consumer_sample
       // wake up the producers to fill us up...
       if (messages.refiller != null) {
         messages.refiller.run
       }
 
-      // kick of dispatching to the consumers.
-      all_subscriptions.valuesIterator.foreach( _.refill_prefetch )
+      // kick off dispatching to the consumers.
+      trigger_swap
       dispatchQueue << head_entry
     }
 
@@ -270,17 +256,12 @@ class Queue(val host: VirtualHost, var i
           entry.as_loaded.store
         }
 
-        def haveQuickConsumer = fast_subscriptions.find( sub=> sub.pos.seq <= entry.seq ).isDefined
-
         var dispatched = false
-        if( entry.hasSubs || haveQuickConsumer ) {
+        if( entry.hasSubs ) {
           // try to dispatch it directly...
           entry.dispatch
-        } else {
-          // we flush the entry out right away if it looks
-          // it wont be needed.
-          entry.flush(true)
         }
+        trigger_swap
 
         // release the store batch...
         if (queueDelivery.uow != null) {
@@ -329,141 +310,100 @@ class Queue(val host: VirtualHost, var i
     }
   }
 
-  def schedual_slow_consumer_check:Unit = {
-
-    def slowConsumerCheck = {
-      if( serviceState.isStarted ) {
-
-        // target tune_min_subscription_rate / sec
-        val slow_cursor_delta = (((tune_slow_subscription_rate) * tune_slow_check_interval) / 1000).toInt
-        var idleConsumerCount = 0
-
-
-        var startedWithFastSubs = !fast_subscriptions.isEmpty
-        fast_subscriptions = Nil
+  def trigger_swap = {
+    if( tune_flush_to_store ) {
+      swap_source.merge(1)
+    }
+  }
 
-        all_subscriptions.foreach{ case (consumer, sub)=>
+  def swap_messages = {
 
-          // Skip over new consumers...
-          if( sub.advanced_size != 0 ) {
+    // reset the prefetch flags...
+    var cur = entries.getHead
+    while( cur!=null ) {
+      cur.prefetch_flags = 0
+      cur = cur.getNext
+    }
 
-            val cursor_delta = sub.advanced_size - sub.last_advanced_size
-            sub.last_advanced_size = sub.advanced_size
+    // Set the prefetch flags
+    all_subscriptions.valuesIterator.foreach( _.refill_prefetch )
 
-            // If the subscription is NOT slow if it's it is tail parked or
-            // it's been parking or at least advancing through the data at
-            // the tune_slow_subscription_rate
-            if( sub.tail_parked || sub.tail_parkings>0 || cursor_delta >= slow_cursor_delta ) {
-              if( sub.slow ) {
-                debug("subscription is now fast: %s", sub)
-                sub.slow_intervals = 0
-              }
-            } else {
-              if( !sub.slow ) {
-                debug("slow interval: %d, %d, %d < %d", sub.slow_intervals, sub.tail_parkings, cursor_delta, slow_cursor_delta)
-                sub.slow_intervals += 1
-                if( sub.slow ) {
-                  debug("subscription is slow: %s", sub)
-                }
-              }
-            }
+    // swap out messages.
+    cur = entries.getHead
+    while( cur!=null ) {
+      if( cur.is_loaded && cur.prefetch_flags==0 && !cur.as_loaded.acquired ) {
+        val flush_asap = !cur.as_loaded.acquired
+//        display_active_entries
+        cur.flush(flush_asap)
+      }
+      cur = cur.getNext
+    }
 
-            // has the consumer been stuck at the tail?
-            if( sub.tail_parked && sub.tail_parkings==0 ) {
-              idleConsumerCount += 1;
-            }
 
-            sub.tail_parkings = 0
-          }
+    // Combine flushed items into flushed ranges
+    if( flushed_items > tune_flush_range_size*2 ) {
 
-          if( !sub.slow ) {
-            fast_subscriptions ::= sub
-          }
-        }
+      debug("Looking for flushed entries to combine")
 
+      var distance_from_sub = tune_flush_range_size;
+      var cur = entries.getHead
+      var combine_counter = 0;
 
-        if (tune_flush_to_store) {
+      while( cur!=null ) {
 
-          // If we no longer have fast subs...
-          if( startedWithFastSubs && fast_subscriptions.isEmpty ) {
+        // get the next now.. since cur may get combined and unlinked
+        // from the entry list.
+        val next = cur.getNext
 
-            // flush tail entries that are still loaded but which have no fast subs that can process them.
-            var cur = entries.getTail
-            while( cur!=null ) {
-              def haveQuickConsumer = fast_subscriptions.find( sub=> sub.pos.seq <= cur.seq ).isDefined
-              if( cur.is_loaded && !cur.hasSubs && !cur.is_prefetched && !cur.as_loaded.acquired && !haveQuickConsumer ) {
-                // then flush out to make space...
-                cur.flush(true)
-                cur = cur.getPrevious
-              } else {
-                cur = null
-              }
+        if( cur.hasSubs || cur.is_prefetched ) {
+          distance_from_sub = 0
+        } else {
+          distance_from_sub += 1
+          if( cur.can_combine_with_prev ) {
+            cur.getPrevious.as_flushed_range.combineNext
+          } else {
+            if( cur.is_flushed && distance_from_sub > tune_flush_range_size ) {
+              cur.flush_range
             }
-
           }
 
+        }
+        cur = next
+      }
+      debug("combined %d entries", combine_counter)
+    }
 
-          // Combine flushed items into flushed ranges
-          if( flushed_items > tune_flush_range_size*2 ) {
-
-            debug("Looking for flushed entries to combine")
-
-            var distance_from_sub = tune_flush_range_size;
-            var cur = entries.getHead
-            var combine_counter = 0;
-
-            while( cur!=null ) {
+  }
 
-              // get the next now.. since cur may get combined and unlinked
-              // from the entry list.
-              val next = cur.getNext
+  def schedual_consumer_sample:Unit = {
 
-              if( cur.hasSubs || cur.is_prefetched ) {
-                distance_from_sub = 0
-              } else {
-                distance_from_sub += 1
-                if( cur.can_combine_with_prev ) {
-                  cur.getPrevious.as_flushed_range.combineNext
-                } else {
-                  if( cur.is_flushed && distance_from_sub > tune_flush_range_size ) {
-                    cur.flush_range
-                  }
-                }
+    def slowConsumerCheck = {
+      if( serviceState.isStarted ) {
 
-              }
-              cur = next
+        // target tune_min_subscription_rate / sec
+        all_subscriptions.foreach{ case (consumer, sub)=>
+          sub.advanced_sizes += {
+            if( sub.tail_parkings > 0  ) {
+              sub.advanced_size.max(1024*1024*20)
+            } else {
+              sub.advanced_size
             }
-
-            debug("combined %d entries", combine_counter)
-
           }
+          sub.tail_parkings = 0
+          while( sub.advanced_sizes.size > 10 ) {
+            sub.advanced_sizes = sub.advanced_sizes.drop(1)
+          }
+          sub.total_advanced_size += sub.advanced_size
+          sub.advanced_size = 0
 
-//          // Trigger a swap if we have consumers waiting for messages and we are full..
-//          if( idleConsumerCount > 0 && messages.full && flushing_size==0 ) {
-//
-//            debug("swapping...")
-//            var entry = head_entry.getNext
-//            while( entry!=null ) {
-//              val loaded = entry.as_loaded
-//
-//              // Keep around prefetched and loaded entries.
-//              if( entry.is_prefetched || (loaded!=null && loaded.acquired)) {
-//                entry.load
-//              } else {
-//                // flush the the others out of memory.
-//                entry.flush(true)
-//              }
-//              entry = entry.getNext
-//            }
-//
-//          }
         }
 
-        schedual_slow_consumer_check
+        swap_messages
+        schedual_consumer_sample
       }
     }
 
-    dispatchQueue.dispatchAfter(tune_slow_check_interval, TimeUnit.MILLISECONDS, ^{
+    dispatchQueue.dispatchAfter(1, TimeUnit.SECONDS, ^{
       slowConsumerCheck
     })
   }
@@ -548,14 +488,9 @@ class Queue(val host: VirtualHost, var i
 
   def bind(values: List[DeliveryConsumer]) = retaining(values) {
     for (consumer <- values) {
-      val subscription = if( tune_flush_to_store) {
-        new PrefetchingSubscription(this)
-      } else {
-        new Subscription(this)
-      }
+      val subscription = new Subscription(this)
       subscription.open(consumer)
       all_subscriptions += consumer -> subscription
-      fast_subscriptions ::= subscription
       addCapacity( tune_consumer_buffer )
     }
   } >>: dispatchQueue
@@ -565,7 +500,6 @@ class Queue(val host: VirtualHost, var i
       all_subscriptions.get(consumer) match {
         case Some(subscription) =>
           all_subscriptions -= consumer
-          fast_subscriptions = fast_subscriptions.filterNot(_ eq subscription)
           subscription.close
           addCapacity( -tune_consumer_buffer )
         case None =>
@@ -638,12 +572,12 @@ class QueueEntry(val queue:Queue, val se
 
   // The number of subscriptions which have requested this entry to be prefeteched (held in memory) so that it's
   // ready for them to get dispatched.
-  var prefetched = 0
+  var prefetch_flags = 0
 
   // The current state of the entry: Tail | Loaded | Flushed | Tombstone
   var state:EntryState = new Tail
 
-  def is_prefetched = prefetched>0
+  def is_prefetched = prefetch_flags == 1
 
   def <(value:QueueEntry) = this.seq < value.seq
   def <=(value:QueueEntry) = this.seq <= value.seq
@@ -723,7 +657,7 @@ class QueueEntry(val queue:Queue, val se
   }
 
   override def toString = {
-    "{seq: "+seq+", prefetched: "+prefetched+", value: "+state+", subscriptions: "+parked+"}"
+    "{seq: "+seq+", prefetch_flags: "+prefetch_flags+", value: "+state+", subscriptions: "+parked+"}"
   }
 
   /////////////////////////////////////////////////////
@@ -825,63 +759,17 @@ class QueueEntry(val queue:Queue, val se
     def flush_range:Unit = throw new AssertionError("should only be called on flushed entries");
 
     /**
-     * Takes the current entry out of the prefetch of all subscriptions
-     * which have prefetched the entry.  Runs the partial function then
-     * refills the prefetch of those subs that were affected.
-     */
-    def with_prefetch_droped(func: =>Unit ):Unit = {
-      if( queue.tune_flush_to_store ) {
-
-        // drop the prefetch
-        val expected = prefetched
-        var prefechingSubs = List[Subscription]()
-        if( queue.tune_flush_to_store ) {
-          // Update the prefetch counter to reflect that this entry is no longer being prefetched.
-          var cur = entry
-          while( cur!=null && is_prefetched ) {
-            if( cur.hasSubs ) {
-              (cur.parked).foreach { case sub:PrefetchingSubscription =>
-                if( sub.is_prefetched(entry) ) {
-                  sub.remove_from_prefetch(entry)
-                  prefechingSubs ::= sub
-                }
-              }
-            }
-            cur = cur.getPrevious
-          }
-        }
-        if(prefetched!=0) {
-          assert(prefetched==0, "entry should not be prefetched.")
-        }
-        assert(expected == prefechingSubs.size, "should get all the subs")
-
-        func
-
-        // refill the prefetch
-        prefechingSubs.foreach{ case sub =>
-          sub.refill_prefetch
-        }
-
-      } else {
-        func
-      }
-    }
-
-    /**
      * Removes the entry from the queue's linked list of entries.  This gets called
      * as a result of an aquired ack.
      */
     def remove = {
-      with_prefetch_droped {
-
-        // advance subscriptions that were on this entry..
-        advance(parked)
-        parked = Nil
-
-        // take the entry of the entries list..
-        unlink
-
-      }
+      // advance subscriptions that were on this entry..
+      advance(parked)
+      parked = Nil
+
+      // take the entry of the entries list..
+      unlink
+      //TODO: perhaps refill subscriptions.
     }
 
     /**
@@ -892,6 +780,7 @@ class QueueEntry(val queue:Queue, val se
       val nextPos = nextOrTail
       nextPos :::= advancing.toList
       advancing.foreach(_.advance(nextPos))
+      queue.trigger_swap
     }
 
   }
@@ -987,6 +876,7 @@ class QueueEntry(val queue:Queue, val se
       if( queue.tune_flush_to_store ) {
         if( stored ) {
           flushing=true
+          queue.flushing_size+=size
           flushed
         } else {
           if( !flushing ) {
@@ -1033,6 +923,7 @@ class QueueEntry(val queue:Queue, val se
       stored = true
       delivery.uow = null
       if( flushing ) {
+        flushing = false
         queue.flushing_size-=size
         queue.capacity_used -= size
         delivery.message.release
@@ -1136,12 +1027,13 @@ class QueueEntry(val queue:Queue, val se
         // the advancing subs move on to the next entry...
         advance(advancing)
 
-        // flush this entry out if it's not going to be needed soon.
-        def haveQuickConsumer = queue.fast_subscriptions.find( sub=> sub.pos.seq <= seq ).isDefined
-        if( !hasSubs && !is_prefetched && !acquired && !haveQuickConsumer ) {
-          // then flush out to make space...
-          flush(false)
-        }
+//        // flush this entry out if it's not going to be needed soon.
+//        if( !hasSubs && prefetch_flags==0 ) {
+//          // then flush out to make space...
+//          var flush_asap = !acquired
+//          flush(flush_asap)
+//        }
+        queue.trigger_swap
         return true
       }
     }
@@ -1184,9 +1076,7 @@ class QueueEntry(val queue:Queue, val se
           // pass off to a source so it can aggregate multiple
           // loads to reduce cross thread synchronization
           if( delivery.isDefined ) {
-            queue.dispatchQueue {
-              queue.store_load_source.merge((this, delivery.get))
-            }
+            queue.store_load_source.merge((this, delivery.get))
           } else {
 
             info("Detected store dropped message at seq: %d", seq)
@@ -1306,18 +1196,17 @@ class QueueEntry(val queue:Queue, val se
               queue.enqueue_size_counter += size_delta
             }
 
-            with_prefetch_droped {
-
-              linkAfter(tmpList)
-              val next = getNext
+            linkAfter(tmpList)
+            val next = getNext
 
-              // move the subs to the first entry that we just loaded.
-              parked.foreach(_.advance(next))
-              next :::= parked
+            // move the subs to the first entry that we just loaded.
+            parked.foreach(_.advance(next))
+            next :::= parked
+            queue.trigger_swap
 
-              unlink
+            unlink
 
-            }
+            // TODO: refill prefetches
           }
         }
       }
@@ -1365,20 +1254,23 @@ class Subscription(queue:Queue) extends 
   var session: DeliverySession = null
   var pos:QueueEntry = null
 
-  var advanced_size = 0L
-
-  // Vars used to detect slow consumers.
-  var last_advanced_size = 0L
-  var tail_parkings = 0
-  var slow_intervals = 0
-
-  def slow = slow_intervals > queue.tune_max_slow_intervals
-
   var acquired_size = 0L
 
+  var total_advanced_size = 0L
+  var advanced_size = 0
+  var advanced_sizes = ListBuffer[Int](1024*1024*20) // use circular buffer instead.
+
+  var tail_parkings = 1
+
+  var best_advanced_size = if(advanced_sizes.isEmpty) {
+    0
+  } else {
+    advanced_sizes.foldLeft(0)(_ max _)
+  }
+
   override def toString = {
     def seq(entry:QueueEntry) = if(entry==null) null else entry.seq
-    "{ id: "+id+", acquired_size: "+acquired_size+", pos: "+seq(pos)+", tail_parkings: "+tail_parkings+"}"
+    "{ id: "+id+", acquired_size: "+acquired_size+", pos: "+seq(pos)+"}"
   }
 
   def browser = session.consumer.browser
@@ -1411,6 +1303,8 @@ class Subscription(queue:Queue) extends 
     session.refiller = NOOP
     session.close
     session = null
+
+    queue.trigger_swap
   }
 
   /**
@@ -1429,9 +1323,8 @@ class Subscription(queue:Queue) extends 
     pos = value
     session.refiller = pos
 
-    refill_prefetch
     if( tail_parked ) {
-      tail_parkings += 1
+      tail_parkings += 0
     }
   }
 
@@ -1456,7 +1349,37 @@ class Subscription(queue:Queue) extends 
 
   def acquire(entry:QueueEntry) = new AcquiredQueueEntry(entry)
 
-  def refill_prefetch = {}
+  def refill_prefetch = {
+
+    var next = if( pos.is_tail ) {
+      null // can't prefetch the tail..
+    } else if( pos.is_head ) {
+      pos.getNext // can't prefetch the head.
+    } else {
+      pos // start prefetching from the current position.
+    }
+
+    var remaining = queue.tune_consumer_buffer - acquired_size
+    while( remaining>0 && next!=null ) {
+      remaining -= next.size
+      next.prefetch_flags |= 1
+      next.load
+      next = next.getNext
+    }
+
+    remaining = if(tail_parkings > 0) {
+      queue.tune_consumer_buffer
+    } else {
+      best_advanced_size
+    }
+
+    while( remaining>0 && next!=null ) {
+      remaining -= next.size
+      next.prefetch_flags |= 2
+      next = next.getNext
+    }
+
+  }
 
   class AcquiredQueueEntry(val entry:QueueEntry) extends LinkedNode[AcquiredQueueEntry] {
 
@@ -1495,7 +1418,7 @@ class Subscription(queue:Queue) extends 
       val next = entry.nextOrTail
       entry.remove // entry size changes to 0
 
-      refill_prefetch
+      queue.trigger_swap
       next.run
     }
 
@@ -1524,119 +1447,3 @@ class Subscription(queue:Queue) extends 
 
 }
 
-/**
- * A subscription which issues message load requests so that messages are prefetched from
- * the store before they are needed for dispatching purposes.
- */
-class PrefetchingSubscription(queue:Queue) extends Subscription(queue)  {
-
-  var prefetch_head:QueueEntry = null
-  var prefetch_tail:QueueEntry = null
-  var prefetched_size = 0
-
-  override def toString = {
-    def seq(entry:QueueEntry) = if(entry==null) null else entry.seq
-    "{ id: "+id+", acquired_size: "+acquired_size+", pos: "+seq(pos)+", prefetch_size: "+prefetched_size+", prefetch_head: "+seq(prefetch_head)+", prefetch_tail: "+seq(prefetch_tail)+", tail_parkings: "+tail_parkings+", prefetchFull: "+prefetch_full+"}"
-  }
-
-
-  override def advance(value:QueueEntry):Unit = {
-    super.advance(value)
-    refill_prefetch // update the prefetch window.
-  }
-
-
-  override def rewind(value: QueueEntry) = {
-    invalidate_prefetch
-    super.rewind(value)
-  }
-
-
-  override def close() = {
-    invalidate_prefetch
-    super.close
-  }
-
-  def prefetch_full = acquired_size + prefetched_size >= queue.tune_consumer_buffer
-
-  override def refill_prefetch() = {
-
-    // first lets reclaim prefetch space
-    while( prefetch_head!=null && prefetch_head < pos ) {
-      remove_from_prefetch(prefetch_head)
-    }
-
-    // now lets fill the prefetch if it has capacity.
-    if( !prefetch_full ) {
-
-      var next = if(prefetch_tail==null) {
-        if( pos.is_tail ) {
-          null // can't prefetch the tail..
-        } else if( pos.is_head ) {
-          pos.getNext // can't prefetch the head.
-        } else {
-          pos // start prefetching from the current position.
-        }
-      } else  {
-        prefetch_tail.getNext // continue prefetching from the last prefetch tail
-      }
-
-      while( !prefetch_full && next!=null ) {
-
-        prefetched_size += next.size
-        next.prefetched += 1
-        next.load
-
-        if( prefetch_head==null ) {
-          prefetch_head = next
-        }
-        prefetch_tail = next
-
-        next = next.getNext
-      }
-    }
-  }
-
-
-
-  /**
-   * Is the specified queue entry prefeteched by this subscription?
-   */
-  def is_prefetched(value:QueueEntry) = {
-    assert(value!=null)
-    prefetch_head!=null && prefetch_head <= value && value <= prefetch_tail
-  }
-
-  def remove_from_prefetch(entry:QueueEntry):Unit = {
-    prefetched_size -= entry.size
-    entry.prefetched -= 1
-
-    if( entry == prefetch_head ) {
-      if( entry == prefetch_tail ) {
-        prefetch_head = null
-        prefetch_tail = null
-        assert( prefetched_size == 0 , "inconsistent prefetch size.")
-      } else {
-        prefetch_head = prefetch_head.getNext
-        if( prefetched_size == 0 ) {
-          assert( prefetched_size != 0 , "inconsistent prefetch size.")
-        }
-      }
-    } else {
-      if( entry == prefetch_tail ) {
-        prefetch_tail = prefetch_tail.getPrevious
-      }
-      if( prefetched_size == 0 ) {
-        assert( prefetched_size != 0 , "inconsistent prefetch size.")
-      }
-    }
-  }
-
-  def invalidate_prefetch: Unit = {
-    while (prefetch_head !=null ) {
-      remove_from_prefetch(prefetch_head)
-    }
-    assert(prefetched_size == 0, "inconsistent prefetch size.")
-  }
-
-}

Modified: activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/EntryStatusDTO.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/EntryStatusDTO.java?rev=1042514&r1=1042513&r2=1042514&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/EntryStatusDTO.java (original)
+++ activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/EntryStatusDTO.java Mon Dec  6 02:25:48 2010
@@ -51,7 +51,7 @@ public class EntryStatusDTO {
     @XmlAttribute(name = "consumer-count")
     public int consumer_count;
 
-    @XmlAttribute(name = "prefetch-count")
-    public int prefetch_count;
+    @XmlAttribute(name = "is-prefetched")
+    public boolean is_prefetched;
 
 }
\ No newline at end of file

Modified: activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueDTO.java
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueDTO.java?rev=1042514&r1=1042513&r2=1042514&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueDTO.java (original)
+++ activemq/activemq-apollo/trunk/apollo-dto/src/main/java/org/apache/activemq/apollo/dto/QueueDTO.java Mon Dec  6 02:25:48 2010
@@ -78,21 +78,6 @@ public class QueueDTO {
     public Integer consumer_buffer;
 
     /**
-     * Subscribers that consume slower than this rate per seconds will be considered
-     * slow.  Once a consumer is considered slow, we may switch to disk spooling.
-     */
-    @XmlAttribute(name="slow-subscription-rate")
-    @JsonProperty("slow_subscription_rate")
-    public Integer slow_subscription_rate;
-
-    /**
-     * The number of milliseconds between slow consumer checks.
-     */
-    @XmlAttribute(name="slow-check-interval")
-    @JsonProperty("slow_check_interval")
-    public Long slow_check_interval;
-
-    /**
      * Should this queue persistently store it's entries?
      */
     @XmlAttribute(name="persistent")

Modified: activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/RuntimeResource.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/RuntimeResource.scala?rev=1042514&r1=1042513&r2=1042514&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/RuntimeResource.scala (original)
+++ activemq/activemq-apollo/trunk/apollo-web/src/main/scala/org/apache/activemq/apollo/web/resources/RuntimeResource.scala Mon Dec  6 02:25:48 2010
@@ -194,7 +194,7 @@ case class RuntimeResource(parent:Broker
               e.count = cur.count
               e.size = cur.size
               e.consumer_count = cur.parked.size
-              e.prefetch_count = cur.prefetched
+              e.is_prefetched = cur.is_prefetched
               e.state = cur.label
 
               result.entries.add(e)

Modified: activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/DestinationStatusDTO.jade
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/DestinationStatusDTO.jade?rev=1042514&r1=1042513&r2=1042514&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/DestinationStatusDTO.jade (original)
+++ activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/DestinationStatusDTO.jade Mon Dec  6 02:25:48 2010
@@ -18,7 +18,7 @@
 - import helper._
 
 .breadcumbs
-  a(href={strip_resolve(".")}) Back
+  a(href={strip_resolve("..")}) Back
 
 h1 Destination: #{name}
 

Modified: activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/QueueStatusDTO.jade
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/QueueStatusDTO.jade?rev=1042514&r1=1042513&r2=1042514&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/QueueStatusDTO.jade (original)
+++ activemq/activemq-apollo/trunk/apollo-web/src/main/webapp/WEB-INF/org/apache/activemq/apollo/dto/QueueStatusDTO.jade Mon Dec  6 02:25:48 2010
@@ -33,14 +33,11 @@ p queue size: #{memory(queue_size)}
 
 h2 Enqueue/Deqeueue Counters
 
-p enqueued: #{enqueue_item_counter} messages
-p enqueued: #{memory(enqueue_size_counter)}
+p enqueued: #{enqueue_item_counter} messages (#{memory(enqueue_size_counter)})
 
-p dequeued: #{dequeue_item_counter} messages
-p dequeued: #{memory(dequeue_size_counter)}
+p dequeued: #{dequeue_item_counter} messages (#{memory(dequeue_size_counter)})
 
-p nacked: #{nack_item_counter} messages
-p nacked: #{memory(nack_size_counter)}
+p nacked: #{nack_item_counter} messages (#{memory(nack_size_counter)})
 
 h2 Swap Status
 
@@ -80,6 +77,6 @@ ul
       tr
         td #{x.state}
         td #{memory(x.size)}
-        td #{x.consumer_count}, #{x.prefetch_count}
+        td #{x.consumer_count}, #{x.is_prefetched}
         td #{x.seq}:#{x.count}