You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2010/07/07 06:14:50 UTC

svn commit: r961185 - /activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala

Author: chirino
Date: Wed Jul  7 04:14:50 2010
New Revision: 961185

URL: http://svn.apache.org/viewvc?rev=961185&view=rev
Log:
combine flushed queue entries to reduce memory usage.

Modified:
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=961185&r1=961184&r2=961185&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala Wed Jul  7 04:14:50 2010
@@ -94,10 +94,6 @@ class Queue(val host: VirtualHost, val d
   var tail_entry = new QueueEntry(this, next_message_seq).tail
   entries.addFirst(head_entry)
 
-  var loading_size = 0
-  var flushing_size = 0
-
-
   //
   // Tuning options.
   //
@@ -140,7 +136,7 @@ class Queue(val host: VirtualHost, val d
    * reference pointers to the actual messages.  When not loaded,
    * the batch is referenced as sequence range to conserve memory.
    */
-  def tune_entry_group_size = 10000
+  def tune_flush_range_size = 10000
 
   /**
    * The number of intervals that a consumer must not meeting the subscription rate before it is
@@ -155,11 +151,13 @@ class Queue(val host: VirtualHost, val d
   var nack_item_counter = 0L
   var nack_size_counter = 0L
 
-  var flushed_items = 0L
-
   def queue_size = enqueue_size_counter - dequeue_size_counter
   def queue_items = enqueue_item_counter - dequeue_item_counter
 
+  var loading_size = 0
+  var flushing_size = 0
+  var flushed_items = 0
+
   private var capacity = tune_producer_buffer
   var size = 0
 
@@ -181,7 +179,7 @@ class Queue(val host: VirtualHost, val d
     }
 
     if( tune_persistent ) {
-      host.store.listQueueEntryRanges(queueKey, tune_entry_group_size) { ranges=>
+      host.store.listQueueEntryRanges(queueKey, tune_flush_range_size) { ranges=>
         dispatchQueue {
           if( !ranges.isEmpty ) {
 
@@ -276,15 +274,15 @@ class Queue(val host: VirtualHost, val d
     var total_items = 0L
     var total_size = 0L
     while (cur != null) {
-      if (cur.is_loaded || cur.hasSubs || cur.is_prefetched || cur.is_flushed_group ) {
+      if (cur.is_loaded || cur.hasSubs || cur.is_prefetched || cur.is_flushed_range ) {
         info("  => " + cur)
       }
 
       total_size += cur.size
       if (cur.is_flushed || cur.is_loaded) {
         total_items += 1
-      } else if (cur.is_flushed_group ) {
-        total_items += cur.as_flushed_group.count
+      } else if (cur.is_flushed_range ) {
+        total_items += cur.as_flushed_range.count
       }
       
       cur = cur.getNext
@@ -304,19 +302,16 @@ class Queue(val host: VirtualHost, val d
   def schedual_slow_consumer_check:Unit = {
 
     def slowConsumerCheck = {
-      if( retained > 0 ) {
+      if( serviceState.isStarted ) {
 
         // Handy for periodically looking at the dispatch state...
         check_counter += 1
 
-        if( (check_counter%10)==0  ) {
-//          display_stats
-        }
-
         if( (check_counter%25)==0 ) {
-//          if (!all_subscriptions.isEmpty) {
-//            display_active_entries
-//          }
+          display_stats
+          if (!all_subscriptions.isEmpty) {
+            display_active_entries
+          }
         }
 
         // target tune_min_subscription_rate / sec
@@ -366,38 +361,83 @@ class Queue(val host: VirtualHost, val d
         }
 
 
-        // If we no longer have fast subs...
-        if( startedWithFastSubs && fast_subscriptions.isEmpty ) {
-          // Flush out the tail entries..
-          var cur = entries.getTail
-          while( cur!=null ) {
-            if( !cur.hasSubs && !cur.is_prefetched ) {
-              cur
+        if (tune_flush_to_store) {
+
+          // If we no longer have fast subs...
+          if( startedWithFastSubs && fast_subscriptions.isEmpty ) {
+
+            // flush tail entries that are still loaded but which have no fast subs that can process them.
+            var cur = entries.getTail
+            while( cur!=null ) {
+              def haveQuickConsumer = fast_subscriptions.find( sub=> sub.pos.seq <= cur.seq ).isDefined
+              if( cur.is_loaded && !cur.hasSubs && !cur.is_prefetched && !cur.as_loaded.acquired && !haveQuickConsumer ) {
+                // then flush out to make space...
+                cur.flush(true)
+                cur = cur.getPrevious
+              } else {
+                cur = null
+              }
             }
-            cur = cur.getPrevious
+
           }
 
 
-        }
+          // Combine flushed items into flushed ranges
+          if( flushed_items > tune_flush_range_size*2 ) {
 
-        // flush tail entries that are still loaded but which have no fast subs that can process them.
-        var cur = entries.getTail
-        while( cur!=null ) {
-          def haveQuickConsumer = fast_subscriptions.find( sub=> sub.pos.seq <= cur.seq ).isDefined
-          if( cur.is_loaded && !cur.hasSubs && !cur.is_prefetched && !cur.as_loaded.acquired && !haveQuickConsumer ) {
-            // then flush out to make space...
-            cur.flush(true)
-            cur = cur.getPrevious
-          } else {
-            cur = null
-          }
-        }
+            println("Looking for flushed entries to combine")
+
+            var distance_from_sub = tune_flush_range_size;
+            var cur = entries.getHead
+            var combine_counter = 0;
+
+            while( cur!=null ) {
+
+              // get the next now.. since cur may get combined and unlinked
+              // from the entry list.
+              val next = cur.getNext
+
+              if( cur.hasSubs || cur.is_prefetched ) {
+                distance_from_sub = 0
+              } else {
+                distance_from_sub += 1
+                if( cur.can_combine_with_prev ) {
+                  cur.getPrevious.as_flushed_range.combineNext
+                } else {
+                  if( cur.is_flushed && distance_from_sub > tune_flush_range_size ) {
+                    cur.flush_range
+                  }
+                }
 
+              }
+              cur = next
+            }
 
-        // Trigger a swap if we have consumers waiting for messages and we are full..
-        if( idleConsumerCount > 0 && messages.full && flushing_size==0 ) {
-          swap
+            println("combined "+combine_counter+" entries")
+
+          }
+
+//          // Trigger a swap if we have consumers waiting for messages and we are full..
+//          if( idleConsumerCount > 0 && messages.full && flushing_size==0 ) {
+//
+//            debug("swapping...")
+//            var entry = head_entry.getNext
+//            while( entry!=null ) {
+//              val loaded = entry.as_loaded
+//
+//              // Keep around prefetched and loaded entries.
+//              if( entry.is_prefetched || (loaded!=null && loaded.acquired)) {
+//                entry.load
+//              } else {
+//                // flush the the others out of memory.
+//                entry.flush(true)
+//              }
+//              entry = entry.getNext
+//            }
+//
+//          }
         }
+
         schedual_slow_consumer_check
       }
     }
@@ -519,35 +559,6 @@ class Queue(val host: VirtualHost, val d
     rc
   }
 
-  /**
-   * Prioritizes all the queue entries so that entries most likely to be consumed
-   * next are a higher priority.  All messages with the highest priority are loaded
-   * and messages with the lowest priority are flushed to make room to accept more
-   * messages from the producer.
-   */
-  def swap():Unit = {
-    if( !host.serviceState.isStarted ) {
-      return
-    }
-    
-    debug("swapping...")
-
-    var entry = head_entry.getNext
-    while( entry!=null ) {
-      val loaded = entry.as_loaded
-
-      // Keep around prefetched and loaded entries.
-      if( entry.is_prefetched || (loaded!=null && loaded.acquired)) {
-        entry.load
-      } else {
-        // flush the the others out of memory.
-        entry.flush(true)
-      }
-      entry = entry.getNext
-    }
-  }
-
-
   val store_flush_source = createSource(new ListEventAggregator[QueueEntry#Loaded](), dispatchQueue)
   store_flush_source.setEventHandler(^ {drain_store_flushes});
   store_flush_source.resume
@@ -700,7 +711,7 @@ class QueueEntry(val queue:Queue, val se
   def as_tail = state.as_tail
 
   def as_flushed = state.as_flushed
-  def as_flushed_group = state.as_flushed_group
+  def as_flushed_range = state.as_flushed_range
   def as_loaded = state.as_loaded
 
   def is_tail = this == queue.tail_entry
@@ -708,9 +719,10 @@ class QueueEntry(val queue:Queue, val se
 
   def is_loaded = as_loaded!=null
   def is_flushed = as_flushed!=null
-  def is_flushed_group = as_flushed_group!=null
+  def is_flushed_range = as_flushed_range!=null
 
   // These should not change the current state.
+  def count = state.count
   def size = state.size
   def messageKey = state.messageKey
   def is_flushed_or_flushing = state.is_flushed_or_flushing
@@ -721,6 +733,15 @@ class QueueEntry(val queue:Queue, val se
   def load = state.load
   def remove = state.remove
 
+  def flush_range = state.flush_range
+
+  def can_combine_with_prev = {
+    getPrevious !=null &&
+      getPrevious.is_flushed_range &&
+        ( is_flushed || is_flushed_range ) &&
+          (getPrevious.count + count  < queue.tune_flush_range_size)
+  }
+
   trait EntryState {
 
     final def entry:QueueEntry = QueueEntry.this
@@ -728,7 +749,7 @@ class QueueEntry(val queue:Queue, val se
     def as_tail:Tail = null
     def as_loaded:Loaded = null
     def as_flushed:Flushed = null
-    def as_flushed_group:FlushedRange = null
+    def as_flushed_range:FlushedRange = null
     def as_head:Head = null
 
     /**
@@ -737,6 +758,11 @@ class QueueEntry(val queue:Queue, val se
     def size = 0
 
     /**
+     * Gets number of messages that this entry represents
+     */
+    def count = 0
+
+    /**
      * Gets the message key for the entry.
      * @returns -1 if it is not known.
      */
@@ -763,6 +789,8 @@ class QueueEntry(val queue:Queue, val se
      */
     def flush(asap:Boolean) = {}
 
+    def flush_range:Unit = throw new AssertionError("should only be called on flushed entries");
+
     /**
      * Takes the current entry out of the prefetch of all subscriptions
      * which have prefetched the entry.  Runs the partial function then
@@ -893,6 +921,7 @@ class QueueEntry(val queue:Queue, val se
 
     override def toString = { "loaded:{ stored: "+stored+", flushing: "+flushing+", acquired: "+acquired+", size:"+size+"}" }
 
+    override def count = 1
     override def size = delivery.size
     override def messageKey = delivery.storeKey
 
@@ -962,6 +991,10 @@ class QueueEntry(val queue:Queue, val se
         queue.flushing_size-=size
         queue.size -= size
         state = new Flushed(delivery.storeKey, size)
+
+        if( can_combine_with_prev ) {
+          getPrevious.as_flushed_range.combineNext
+        }
       }
     }
 
@@ -1074,8 +1107,12 @@ class QueueEntry(val queue:Queue, val se
    */
   class Flushed(override val messageKey:Long, override val size:Int) extends EntryState {
 
+    queue.flushed_items += 1
+
     var loading = false
 
+    override def count = 1
+
     override def as_flushed = this
 
     override def is_flushed_or_flushing = true
@@ -1121,6 +1158,7 @@ class QueueEntry(val queue:Queue, val se
         delivery.storeKey = messageRecord.key
 
         queue.size += size
+        queue.flushed_items -= 1
         state = new Loaded(delivery, true)
       } else {
 //        debug("Ignoring store load of: ", messageKey)
@@ -1133,8 +1171,18 @@ class QueueEntry(val queue:Queue, val se
         loading = false
         queue.loading_size -= size
       }
+      queue.flushed_items -= 1
       super.remove
     }
+
+    override def flush_range = {
+      if( loading ) {
+        loading = false
+        queue.loading_size -= size
+      }
+      queue.flushed_items -= 1
+      state = new FlushedRange(seq, 1, size)
+    }
   }
 
   /**
@@ -1151,13 +1199,16 @@ class QueueEntry(val queue:Queue, val se
     /** the last seq id in the range */
     var last:Long,
     /** the number of items in the range */
-    var count:Int,
+    var _count:Int,
     /** size in bytes of the range */
-    override val size:Int) extends EntryState {
+    var _size:Int) extends EntryState {
+
+    override def count = _count
+    override def size = _size
 
     var loading = false
 
-    override def as_flushed_group = this
+    override def as_flushed_range = this
 
     override def is_flushed_or_flushing = true
 
@@ -1210,9 +1261,28 @@ class QueueEntry(val queue:Queue, val se
       }
     }
 
-    override def remove = {
-      throw new AssertionError("Flushed range cannbot be removed.");
+    /**
+     * Combines this queue entry with the next queue entry.
+     */
+    def combineNext():Unit = {
+      val value = getNext
+      assert(value!=null)
+      assert(value.is_flushed || value.is_flushed_range)
+      if( value.is_flushed ) {
+        assert(last < value.seq )
+        last = value.seq
+        _count += 1
+        _size += value.size
+        value.remove
+      } else if( value.is_flushed_range ) {
+        assert(last < value.seq )
+        last = value.as_flushed_range.last
+        _count += value.as_flushed_range.count
+        _size += value.size
+        value.remove
+      }
     }
+
   }
 
 
@@ -1476,13 +1546,17 @@ class PrefetchingSubscription(queue:Queu
         assert( prefetched_size == 0 , "inconsistent prefetch size.")
       } else {
         prefetch_head = prefetch_head.getNext
-        assert( prefetched_size != 0 , "inconsistent prefetch size.")
+        if( prefetched_size == 0 ) {
+          assert( prefetched_size != 0 , "inconsistent prefetch size.")
+        }
       }
     } else {
       if( entry == prefetch_tail ) {
         prefetch_tail = prefetch_tail.getPrevious
       }
-      assert( prefetched_size != 0 , "inconsistent prefetch size.")
+      if( prefetched_size == 0 ) {
+        assert( prefetched_size != 0 , "inconsistent prefetch size.")
+      }
     }
   }