You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2010/07/07 06:14:34 UTC

svn commit: r961182 - /activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala

Author: chirino
Date: Wed Jul  7 04:14:34 2010
New Revision: 961182

URL: http://svn.apache.org/viewvc?rev=961182&view=rev
Log:
Moved all the prefetch related logic into a new PrefetchingSubscription sub class.  Should make it easier to maintain that logic.

Modified:
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=961182&r1=961181&r2=961182&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala Wed Jul  7 04:14:34 2010
@@ -28,6 +28,7 @@ import java.util.concurrent.TimeUnit
 import java.util.{HashSet, Collections, ArrayList, LinkedList}
 import org.apache.activemq.apollo.store.{QueueEntryRange, QueueEntryRecord, MessageRecord}
 import collection.mutable.ListBuffer
+import java.util.concurrent.atomic.AtomicInteger
 
 /**
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
@@ -52,6 +53,7 @@ trait QueueLifecyleListener {
 
 
 object Queue extends Log {
+  val subcsription_counter = new AtomicInteger(0)
 }
 
 /**
@@ -248,7 +250,7 @@ class Queue(val host: VirtualHost, val d
         } else {
           // we flush the entry out right away if it looks
           // it wont be needed.
-          entry.flush
+          entry.flush(true)
         }
 
         // release the store batch...
@@ -384,7 +386,7 @@ class Queue(val host: VirtualHost, val d
           def haveQuickConsumer = fast_subscriptions.find( sub=> sub.pos.seq <= cur.seq ).isDefined
           if( cur.is_loaded && !cur.hasSubs && !cur.is_prefetched && !cur.as_loaded.acquired && !haveQuickConsumer ) {
             // then flush out to make space...
-            cur.flush
+            cur.flush(true)
             cur = cur.getPrevious
           } else {
             cur = null
@@ -477,7 +479,11 @@ class Queue(val host: VirtualHost, val d
 
   def bind(values: List[DeliveryConsumer]) = retaining(values) {
     for (consumer <- values) {
-      val subscription = new Subscription(this)
+      val subscription = if( tune_flush_to_store) {
+        new PrefetchingSubscription(this)
+      } else {
+        new Subscription(this)
+      }
       subscription.open(consumer)
       all_subscriptions += consumer -> subscription
       fast_subscriptions ::= subscription
@@ -535,7 +541,7 @@ class Queue(val host: VirtualHost, val d
         entry.load
       } else {
         // flush the the others out of memory.
-        entry.flush
+        entry.flush(true)
       }
       entry = entry.getNext
     }
@@ -602,6 +608,9 @@ class QueueEntry(val queue:Queue, val se
 
   def is_prefetched = prefetched>0
 
+  def <(value:QueueEntry) = this.seq < value.seq
+  def <=(value:QueueEntry) = this.seq <= value.seq
+
   def head():QueueEntry = {
     state = new Head
     this
@@ -708,7 +717,7 @@ class QueueEntry(val queue:Queue, val se
   def dispatch() = state.dispatch
 
   // These methods may cause a change in the current state.
-  def flush = state.flush
+  def flush(asap:Boolean) = state.flush(asap)
   def load = state.load
   def remove = state.remove
 
@@ -752,32 +761,49 @@ class QueueEntry(val queue:Queue, val se
     /**
      * Triggers the entry to get flushed if it's not already flushed.
      */
-    def flush = {}
+    def flush(asap:Boolean) = {}
 
     /**
      * Takes the current entry out of the prefetch of all subscriptions
-     * which have prefetched the entry.  Returns the list of subscriptions which
-     * had prefetched it.
+     * which have prefetched the entry.  Runs the partial function then
+     * refills the prefetch of those subs that were affected.
      */
-    def prefetch_remove = {
-      var rc = List[Subscription]()
+    def with_prefetch_droped(func: =>Unit ):Unit = {
       if( queue.tune_flush_to_store ) {
-        // Update the prefetch counter to reflect that this entry is no longer being prefetched.
-        var cur = entry
-        while( cur!=null && is_prefetched ) {
-          if( cur.hasSubs ) {
-            (cur.parked).foreach { sub =>
-              if( sub.is_prefetched(entry) ) {
-                sub.remove_from_prefetch(entry)
-                rc ::= sub
+
+        // drop the prefetch
+        val expected = prefetched
+        var prefechingSubs = List[Subscription]()
+        if( queue.tune_flush_to_store ) {
+          // Update the prefetch counter to reflect that this entry is no longer being prefetched.
+          var cur = entry
+          while( cur!=null && is_prefetched ) {
+            if( cur.hasSubs ) {
+              (cur.parked).foreach { case sub:PrefetchingSubscription =>
+                if( sub.is_prefetched(entry) ) {
+                  sub.remove_from_prefetch(entry)
+                  prefechingSubs ::= sub
+                }
               }
             }
+            cur = cur.getPrevious
           }
-          cur = cur.getPrevious
         }
-        assert(!is_prefetched, "entry should not be prefetched.")
+        if(prefetched!=0) {
+          assert(prefetched==0, "entry should not be prefetched.")
+        }
+        assert(expected == prefechingSubs.size, "should get all the subs")
+
+        func
+
+        // refill the prefetch
+        prefechingSubs.foreach{ case sub =>
+          sub.refill_prefetch
+        }
+
+      } else {
+        func
       }
-      rc
     }
 
     /**
@@ -785,16 +811,16 @@ class QueueEntry(val queue:Queue, val se
      * as a result of an aquired ack.
      */
     def remove = {
-      // take us out of subscription prefetches..
-      var refill_preftch_list = prefetch_remove
-      // advance subscriptions that were on this entry..
-      parked.foreach(_.advance(next))
-      nextOrTail :::= parked
-      parked = Nil
-      // take the entry of the entries list..
-      unlink
-      // refill the subscription prefetches..
-      refill_preftch_list.foreach( _.refill_prefetch )
+      with_prefetch_droped {
+
+        // advance subscriptions that were on this entry..
+        advance(parked)
+        parked = Nil
+
+        // take the entry of the entries list..
+        unlink
+
+      }
     }
 
     /**
@@ -835,7 +861,7 @@ class QueueEntry(val queue:Queue, val se
 
     override def remove = throw new AssertionError("Head entry cannot be removed")
     override def load = throw new AssertionError("Head entry cannot be loaded")
-    override def flush = throw new AssertionError("Head entry cannot be flushed")
+    override def flush(asap:Boolean) = throw new AssertionError("Head entry cannot be flushed")
   }
 
   /**
@@ -850,7 +876,7 @@ class QueueEntry(val queue:Queue, val se
 
     override def remove = throw new AssertionError("Tail entry cannot be removed")
     override def load = throw new AssertionError("Tail entry cannot be loaded")
-    override def flush = throw new AssertionError("Tail entry cannot be flushed")
+    override def flush(asap:Boolean) = throw new AssertionError("Tail entry cannot be flushed")
 
   }
 
@@ -883,7 +909,7 @@ class QueueEntry(val queue:Queue, val se
       })
     }
 
-    override def flush() = {
+    override def flush(asap:Boolean) = {
       if( queue.tune_flush_to_store ) {
         if( stored ) {
           flushing=true
@@ -895,7 +921,9 @@ class QueueEntry(val queue:Queue, val se
 
             // The storeBatch is only set when called from the messages.offer method
             if( delivery.uow!=null ) {
-              delivery.uow.completeASAP
+              if( asap ) {
+                delivery.uow.completeASAP
+              }
             } else {
 
               // Are swapping out a non-persistent message?
@@ -905,14 +933,18 @@ class QueueEntry(val queue:Queue, val se
                 val uow = delivery.uow
                 delivery.storeKey = uow.store(delivery.createMessageRecord)
                 store
-                uow.completeASAP
+                if( asap ) {
+                  uow.completeASAP
+                }
                 uow.release
                 delivery.uow = null
 
               } else {
-
-                queue.host.store.flushMessage(messageKey) {
-                  queue.store_flush_source.merge(this)
+                  
+                if( asap ) {
+                  queue.host.store.flushMessage(messageKey) {
+                    queue.store_flush_source.merge(this)
+                  }
                 }
 
               }
@@ -987,7 +1019,7 @@ class QueueEntry(val queue:Queue, val se
             } else {
 
               // Is the sub flow controlled?
-              if( sub.full || (sub.prefetchFull && !sub.is_prefetched(entry) ) ) {
+              if( sub.full ) {
                 // hold back: flow controlled
                 heldBack += sub
               } else {
@@ -1028,7 +1060,7 @@ class QueueEntry(val queue:Queue, val se
         def haveQuickConsumer = queue.fast_subscriptions.find( sub=> sub.pos.seq <= seq ).isDefined
         if( !hasSubs && !is_prefetched && !acquired && !haveQuickConsumer ) {
           // then flush out to make space...
-          flush
+          flush(false)
         }
         return true
       }
@@ -1161,17 +1193,18 @@ class QueueEntry(val queue:Queue, val se
               queue.enqueue_size_counter += size_delta
             }
 
-            var refill_preftch_list = prefetch_remove
+            with_prefetch_droped {
+
+              linkAfter(tmpList)
+              val next = getNext
 
-            linkAfter(tmpList)
-            val next = getNext
+              // move the subs to the first entry that we just loaded.
+              parked.foreach(_.advance(next))
+              next :::= parked
 
-            // move the subs to the first entry that we just loaded.
-            parked.foreach(_.advance(next))
-            next :::= parked
+              unlink
 
-            unlink
-            refill_preftch_list.foreach( _.refill_prefetch )
+            }
           }
         }
       }
@@ -1185,12 +1218,18 @@ class QueueEntry(val queue:Queue, val se
 
 }
 
-
+/**
+ * Interfaces a DispatchConsumer with a Queue.  Tracks current position of the consumer
+ * on the queue, and the delivery rate so that slow consumers can be detected.  It also
+ * tracks the entries which the consumer has acquired.
+ *
+ */
 class Subscription(queue:Queue) extends DeliveryProducer with DispatchLogging {
   override protected def log = Queue
 
   def dispatchQueue = queue.dispatchQueue
 
+  val id = Queue.subcsription_counter.incrementAndGet
   var acquired = new LinkedNodeList[AcquiredQueueEntry]
   var session: DeliverySession = null
   var pos:QueueEntry = null
@@ -1204,13 +1243,11 @@ class Subscription(queue:Queue) extends 
 
   def slow = slow_intervals > queue.tune_max_slow_intervals
 
-  var prefetch_tail:QueueEntry = null
-  var prefetched_size = 0
   var acquired_size = 0L
 
   override def toString = {
     def seq(entry:QueueEntry) = if(entry==null) null else entry.seq
-    "{ acquired_size: "+acquired_size+", prefetch_size: "+prefetched_size+", pos: "+seq(pos)+" prefetch_tail: "+seq(prefetch_tail)+", tail_parkings: "+tail_parkings+"}"
+    "{ id: "+id+", acquired_size: "+acquired_size+", pos: "+seq(pos)+", tail_parkings: "+tail_parkings+"}"
   }
 
   def browser = session.consumer.browser
@@ -1230,10 +1267,8 @@ class Subscription(queue:Queue) extends 
 
   def close() = {
     pos -= this
-
-    invalidate_prefetch
-
     pos = null
+
     session.refiller = null
     session.close
     session = null
@@ -1252,21 +1287,16 @@ class Subscription(queue:Queue) extends 
    * queue entry.
    */
   def advance(value:QueueEntry):Unit = {
-    assert(value!=null)
-    assert(pos!=null) 
-
-    // Remove the previous pos from the prefetch counters.
-    if( prefetch_tail!=null && !pos.is_head) {
-      remove_from_prefetch(pos)
-    }
 
+    assert(value!=null)
+    assert(pos!=null)
 
     advanced_size += pos.size
 
     pos = value
     session.refiller = pos
 
-    refill_prefetch()
+    refill_prefetch
     if( tail_parked ) {
       tail_parkings += 1
     }
@@ -1278,90 +1308,20 @@ class Subscription(queue:Queue) extends 
    */
   def rewind(value:QueueEntry):Unit = {
     assert(value!=null)
-    invalidate_prefetch
     pos = value
     session.refiller = pos
     queue.dispatchQueue << value // queue up the entry to get dispatched..
   }
 
-  def invalidate_prefetch: Unit = {
-    if (prefetch_tail != null) {
-      // release the prefetch counters...
-      var cur = pos
-      while (cur.seq <= prefetch_tail.seq) {
-        if (!cur.is_head) {
-          prefetched_size -= cur.size
-          cur.prefetched -= 1
-        }
-        cur = cur.nextOrTail
-      }
-      assert(prefetched_size == 0, "inconsistent prefetch size.")
-    }
-  }
-
-
-  /**
-   * Is the specified queue entry prefeteched by this subscription?
-   */
-  def is_prefetched(value:QueueEntry) = {
-    prefetch_tail!=null && value!=null && pos.seq <= value.seq && value.seq <= prefetch_tail.seq
-  }
-
-
-  def add_to_prefetch(entry:QueueEntry):Unit = {
-    assert( !entry.is_head, "tombstones should not be prefetched..")
-    prefetched_size += entry.size
-    entry.prefetched += 1
-    entry.load
-    prefetch_tail = entry
-  }
-
-  def remove_from_prefetch(entry:QueueEntry):Unit = {
-    prefetched_size -= entry.size
-    entry.prefetched -= 1
-
-    if( entry == prefetch_tail ) {
-      prefetch_tail = prefetch_tail.getPrevious;
-      if( prefetch_tail==null || prefetch_tail.seq < pos.seq ) {
-        prefetch_tail = null
-        assert( prefetched_size == 0 , "inconsistent prefetch size.")
-      }
-    } else {
-      assert( prefetched_size >= 0 , "inconsistent prefetch size.")
-    }
-  }
-
-  def refill_prefetch() = {
-    if( queue.tune_flush_to_store ) {
-      def next_prefetch_pos = if(prefetch_tail==null) {
-        if( !pos.is_tail ) {
-          pos
-        } else {
-          null
-        }
-      } else  {
-        prefetch_tail.getNext
-      }
-
-      // attempts to fill the prefetch...
-      var next = next_prefetch_pos
-      while( !prefetchFull && next!=null ) {
-        if( !next.is_head ) {
-          add_to_prefetch(next)
-        }
-        next = next.getNext
-      }
-    }
-  }
-
-  def prefetchFull() = acquired_size + prefetched_size >= queue.tune_consumer_buffer
-
   def tail_parked = pos eq queue.tail_entry
 
   def matches(entry:Delivery) = session.consumer.matches(entry)
   def full = session.full
   def offer(delivery:Delivery) = session.offer(delivery)
 
+  def acquire(entry:QueueEntry) = new AcquiredQueueEntry(entry)
+
+  def refill_prefetch = {}
 
   class AcquiredQueueEntry(val entry:QueueEntry) extends LinkedNode[AcquiredQueueEntry] {
 
@@ -1420,7 +1380,117 @@ class Subscription(queue:Queue) extends 
     }
   }
 
+}
 
-  def acquire(entry:QueueEntry) = new AcquiredQueueEntry(entry)
+/**
+ * A subscription which issues message load requests so that messages are prefetched from
+ * the store before they are needed for dispatching purposes.
+ */
+class PrefetchingSubscription(queue:Queue) extends Subscription(queue)  {
 
-}
+  var prefetch_head:QueueEntry = null
+  var prefetch_tail:QueueEntry = null
+  var prefetched_size = 0
+
+  override def toString = {
+    def seq(entry:QueueEntry) = if(entry==null) null else entry.seq
+    "{ id: "+id+", acquired_size: "+acquired_size+", pos: "+seq(pos)+", prefetch_size: "+prefetched_size+", prefetch_head: "+seq(prefetch_head)+", prefetch_tail: "+seq(prefetch_tail)+", tail_parkings: "+tail_parkings+", prefetchFull: "+prefetch_full+"}"
+  }
+
+
+  override def advance(value:QueueEntry):Unit = {
+    super.advance(value)
+    refill_prefetch // update the prefetch window.
+  }
+
+
+  override def rewind(value: QueueEntry) = {
+    invalidate_prefetch
+    super.rewind(value)
+  }
+
+
+  override def close() = {
+    invalidate_prefetch
+    super.close
+  }
+
+  def prefetch_full = acquired_size + prefetched_size >= queue.tune_consumer_buffer
+
+  override def refill_prefetch() = {
+
+    // first lets reclaim prefetch space
+    while( prefetch_head!=null && prefetch_head < pos ) {
+      remove_from_prefetch(prefetch_head)
+    }
+
+    // now lets fill the prefetch if it has capacity.
+    if( !prefetch_full ) {
+
+      var next = if(prefetch_tail==null) {
+        if( pos.is_tail ) {
+          null // can't prefetch the tail..
+        } else if( pos.is_head ) {
+          pos.getNext // can't prefetch the head.
+        } else {
+          pos // start prefetching from the current position.
+        }
+      } else  {
+        prefetch_tail.getNext // continue prefetching from the last prefetch tail
+      }
+
+      while( !prefetch_full && next!=null ) {
+
+        prefetched_size += next.size
+        next.prefetched += 1
+        next.load
+
+        if( prefetch_head==null ) {
+          prefetch_head = next
+        }
+        prefetch_tail = next
+
+        next = next.getNext
+      }
+    }
+  }
+
+
+
+  /**
+   * Is the specified queue entry prefeteched by this subscription?
+   */
+  def is_prefetched(value:QueueEntry) = {
+    assert(value!=null)
+    prefetch_head!=null && prefetch_head <= value && value <= prefetch_tail
+  }
+
+  def remove_from_prefetch(entry:QueueEntry):Unit = {
+    prefetched_size -= entry.size
+    entry.prefetched -= 1
+
+    if( entry == prefetch_head ) {
+      if( entry == prefetch_tail ) {
+        prefetch_head = null
+        prefetch_tail = null
+        assert( prefetched_size == 0 , "inconsistent prefetch size.")
+      } else {
+        prefetch_head = prefetch_head.getNext
+        assert( prefetched_size != 0 , "inconsistent prefetch size.")
+      }
+    } else {
+      if( entry == prefetch_tail ) {
+        prefetch_tail = prefetch_tail.getPrevious
+      }
+      assert( prefetched_size != 0 , "inconsistent prefetch size.")
+    }
+  }
+
+  def invalidate_prefetch: Unit = {
+    while (prefetch_head !=null ) {
+      remove_from_prefetch(prefetch_head)
+    }
+    assert(prefetched_size == 0, "inconsistent prefetch size.")
+  }
+
+}
\ No newline at end of file