You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2010/07/07 06:10:36 UTC

svn commit: r961144 - /activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala

Author: chirino
Date: Wed Jul  7 04:10:35 2010
New Revision: 961144

URL: http://svn.apache.org/viewvc?rev=961144&view=rev
Log:
working towards better consumer decoupling on a single queue, and better mesage prefetching from storage

Modified:
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=961144&r1=961143&r2=961144&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala Wed Jul  7 04:10:35 2010
@@ -104,21 +104,47 @@ class Queue(val host: VirtualHost, val d
   var flushingSize = 0
   var storeId: Long = -1L
 
+  //
+  // Tuning options.
+  //
+
+  /**
+   *  The amount of memory buffer space for receiving messages.
+   */
+  var tune_inbound_buffer = 1024 * 32
+
+  /**
+   *  The amount of memory buffer space to use per subscription.
+   */
+  var tune_subscription_buffer = 1024*32
+
   /**
-   * Tunning options.
+   * Subscribers that consume slower than this rate per seconds will be considered
+   * slow.
    */
-  var tune_max_size = 1024 * 256
-  var tune_subscription_prefetch = 1024*32
-  var tune_max_outbound_size = 1024 * 1204 * 5
-  var tune_swap_delay = 100L
+  var tune_slow_subscription_rate = 1000*1024
+
+  /**
+   * The number of milliseconds between slow consumer checks.
+   */
+  var tune_slow_check_interval = 100L
+
+  /**
+   * The number of intervals that a consumer must not meeting the subscription rate before it is
+   * flagged as a slow consumer. 
+   */
+  var tune_max_slow_intervals = 10
 
   var enqueue_counter = 0L
   var dequeue_counter = 0L
   var enqueue_size = 0L
   var dequeue_size = 0L
 
+  private var capacity = tune_inbound_buffer
   private var size = 0
 
+  schedualSlowConsumerCheck
+
   def restore(storeId: Long, records:Seq[QueueEntryRecord]) = ^{
     this.storeId = storeId
     if( !records.isEmpty ) {
@@ -135,11 +161,15 @@ class Queue(val host: VirtualHost, val d
     }
   } >>: dispatchQueue
 
+  def addCapacity(amount:Int) = {
+    capacity += amount
+  }
+
   object messages extends Sink[Delivery] {
 
     var refiller: Runnable = null
 
-    def full = if(size >= tune_max_size)
+    def full = if(size >= capacity)
       true
     else
       false
@@ -166,54 +196,42 @@ class Queue(val host: VirtualHost, val d
           queueDelivery.storeBatch.enqueue(entry.createQueueEntryRecord)
         }
 
-        var swap_check = false
-        if( !entry.hasSubs ) {
+        // do we have at least 1 subscription that is keeping up with the producers
+        // and is interested in this message?
+//        val hold = consumerSubs.valuesIterator.find( sub=> !sub.slow && sub.matches(delivery) ).isDefined
+        def haveQuickConsumer = consumerSubs.valuesIterator.find( sub=> !sub.slow ).isDefined
+
+        var dispatched = false
+        if( entry.prefetched > 0 || haveQuickConsumer ) {
+          // try to dispatch it directly...
+//          println("hold: "+delivery.message.getProperty("color"))
+          entry.dispatch
+
+        } else {
+//          println("flush: "+delivery.message.getProperty("color"))
           // we flush the entry out right away if it looks
           // it wont be needed.
-
-          if( entry.getPrevious.isFlushedOrFlushing ) {
-            // in this case take it out of memory too...
-            flushingSize += entry.flush
-          } else {
-            if( slow_consumers ) {
-              if( delivery.storeBatch!=null ) {
-                // just make it hit the disk quick.. but keep it in memory.
-                delivery.storeBatch.eagerFlush(^{})
-              }
-            } else {
-              if( !checking_for_slow_consumers ) {
-                checking_for_slow_consumers=true
-                val tail_consumer_counter_copy = tail_consumer_counter
-                dispatchQueue.dispatchAfter(tune_swap_delay, TimeUnit.MILLISECONDS, ^{
-                  if( tail_consumer_counter_copy == tail_consumer_counter ) {
-                    slow_consumers = true
-                  }
-                  checking_for_slow_consumers = false
-                })
-              }
-            }
-            swap_check=true
+          entry.flush
+          if( full ) {
+            println("full... waiting for flushes");
           }
-        } else {
-          slow_consumers = false
-          tail_consumer_counter += 1
-          //  entry.dispatch==null if the entry was fully dispatched
-          swap_check = entry.dispatch!=null
-        }
 
-        // Does it look like we need to start swapping to make room
-        // for more messages?
-        if( swap_check && host.store!=null &&  full ) {
-          val wasAt = dequeue_size
-          dispatchQueue.dispatchAfter(tune_swap_delay, TimeUnit.MILLISECONDS, ^{
-            // start swapping if was still blocked after a short delay
-            if( dequeue_size == wasAt && full ) {
-              println("swapping...")
-              swap
-            }
-          })
+          // just make it hit the disk quick.. but keep it in memory.
+          // delivery.storeBatch.eagerFlush(^{})
         }
 
+//        // Does it look like we need to start swapping to make room
+//        // for more messages?
+//        if( !dispatched && host.store!=null && full ) {
+//          val wasAt = dequeue_size
+//          dispatchQueue.dispatchAfter(tune_slow_check_interval, TimeUnit.MILLISECONDS, ^{
+//            // start swapping if was still blocked after a short delay
+//            if( dequeue_size == wasAt && full ) {
+//              swap
+//            }
+//          })
+//        }
+
         // release the store batch...
         if (queueDelivery.storeBatch != null) {
           queueDelivery.storeBatch.release
@@ -225,9 +243,62 @@ class Queue(val host: VirtualHost, val d
     }
   }
 
-  var tail_consumer_counter = 0L
-  var checking_for_slow_consumers = false
-  var slow_consumers = false
+  def schedualSlowConsumerCheck:Unit = {
+
+    def slowConsumerCheck = {
+      if( retained > 0 ) {
+
+        // target tune_min_subscription_rate / sec
+        val slowCursorDelta = (((tune_slow_subscription_rate) * tune_slow_check_interval) / 1000).toInt
+
+        var idleConsumerCount = 0
+
+        consumerSubs.foreach{ case (consumer, sub)=>
+
+          // Skip over new consumers...
+          if( sub.cursoredCounter != 0 ) {
+
+            val cursorDelta = sub.cursoredCounter - sub.prevCursoredCounter 
+            sub.prevCursoredCounter = sub.cursoredCounter
+
+            // If the subscription is NOT slow if it's been tail parked or
+            // it's been parking and cursoring through the data at the tune_slow_subscription_rate 
+            if( (sub.tailParked && sub.tailParkings==0) || ( sub.tailParkings > 0 && cursorDelta >= slowCursorDelta ) ) {
+              if( sub.slow ) {
+                debug("consumer is no longer slow: %s", consumer)
+                sub.slowIntervals = 0
+              }
+            } else {
+              if( !sub.slow ) {
+                debug("slow interval: %d, %d, %d", sub.slowIntervals, sub.tailParkings, cursorDelta)
+                sub.slowIntervals += 1
+                if( sub.slow ) {
+                  debug("consumer is slow: %s", consumer)
+                }
+              }
+            }
+
+            // has the consumer been stuck at the tail?
+            if( sub.tailParked && sub.tailParkings==0 ) {
+              idleConsumerCount += 1;
+            }
+
+            sub.tailParkings = 0
+          }
+        }
+
+        // Trigger a swap if we have slow consumers and we are full..
+        if( idleConsumerCount > 0 && messages.full && flushingSize==0 ) {
+          swap
+        }
+        schedualSlowConsumerCheck
+      }
+    }
+
+    dispatchQueue.dispatchAfter(tune_slow_check_interval, TimeUnit.MILLISECONDS, ^{
+      slowConsumerCheck
+    })
+  }
 
   def ack(entry: QueueEntry, sb:StoreBatch) = {
     if (entry.ref != -1) {
@@ -250,8 +321,6 @@ class Queue(val host: VirtualHost, val d
     dequeue_size += entry.size
     size -= entry.size
     entry.tombstone
-
-    messages.refiller.run
   }
 
 
@@ -266,6 +335,9 @@ class Queue(val host: VirtualHost, val d
         entry.unlink
         ack(entry.value, tx)
     }
+    
+//    println("acked... full: "+messages.full)
+    messages.refiller.run
   }
   
   /////////////////////////////////////////////////////////////////////
@@ -327,6 +399,7 @@ class Queue(val host: VirtualHost, val d
       val subscription = new Subscription(this)
       subscription.connect(consumer)
       consumerSubs += consumer -> subscription
+      addCapacity( tune_subscription_buffer )
     }
   } >>: dispatchQueue
 
@@ -336,6 +409,7 @@ class Queue(val host: VirtualHost, val d
         case Some(cs) =>
           cs.close
           consumerSubs -= consumer
+          addCapacity( -tune_subscription_buffer )
         case None =>
       }
     }
@@ -362,78 +436,25 @@ class Queue(val host: VirtualHost, val d
    * messages from the producer.
    */
   def swap():Unit = {
-
     if( !host.serviceState.isStarted ) {
       return
     }
-
-    class Prio(val entry:QueueEntry) extends Comparable[Prio] {
-      var value = 0
-      def compareTo(o: Prio) = o.value - value
-    }
-
-    val prios = new ArrayList[Prio](entries.size())
-
+    
+    debug("swapping...")
     var entry = entries.getHead
     while( entry!=null ) {
+      println(entries)
       if( entry.asTombstone == null ) {
-        prios.add(new Prio(entry))
-      }
-      entry = entry.getNext
-    }
-
-
-    /**
-     * adds keep priority to the range of entries starting at x
-     * and spanning the size provided.
-     */
-    def prioritize(i:Int, size:Int, p:Int):Unit = {
-      val prio = prios.get(i)
-      prio.value += p
-      val remainingSize = size - prio.entry.size
-      if( remainingSize > 0 ) {
-        val next = i + 1
-        if( next < prios.size ) {
-          prioritize(next, remainingSize, p-1)
-        }
-      }
-    }
 
-    // Prioritize the entries so that higher priority entries are swapped in,
-    // and lower priority entries are swapped out.
-    var i = 0
-    while( i < prios.size ) {
-      val prio = prios.get(i)
-      if( prio.entry.hasSubs ) {
-
-        var credits =0;
-        if( prio.entry.competing != Nil) {
-          credits += prio.entry.competing.size * tune_subscription_prefetch
-        } else{
-          if( prio.entry.browsing != Nil ) {
-            credits += tune_subscription_prefetch
-          }
+        // only keep prefetch entries around..
+        if( entry.prefetched == 0 ) {
+          entry.flush
+        } else {
+          entry.load
         }
-        prioritize(i, credits, 1000)
-
       }
-      i += 1
-    }
 
-    Collections.sort(prios)
-
-    var remaining = tune_max_size / 2
-    i = 0
-    while( i < prios.size ) {
-      val prio = prios.get(i)
-      val entry = prio.entry
-      if( remaining > 0 ) {
-        loadingSize += entry.load
-        remaining -= entry.size
-      } else {
-        flushingSize += entry.flush
-      }
-      i += 1
+      entry = entry.getNext
     }
   }
 
@@ -473,7 +494,7 @@ class Queue(val host: VirtualHost, val d
         entry.flushed
       }
     }
-    
+    println("flushes done... full: "+messages.full);
     messages.refiller.run
 
   }
@@ -488,11 +509,17 @@ object QueueEntry extends Sizer[QueueEnt
 class QueueEntry(val queue:Queue) extends LinkedNode[QueueEntry] with Comparable[QueueEntry] with Runnable {
   import QueueEntry._
 
-  var seq: Long = -1L
+  var seq: Long = 0L
   var competing:List[Subscription] = Nil
   var browsing:List[Subscription] = Nil
+  var prefetched = 0
+
   var value:EntryType = null
 
+  override def toString = {
+    "{seq: "+seq+", prefetched: "+prefetched+", value: "+value+", competing: "+competing+", browsing: "+browsing+"}"
+  }
+
   def createQueueEntryRecord = {
     val qer = new QueueEntryRecord
     qer.queueKey = queue.storeId
@@ -509,6 +536,8 @@ class QueueEntry(val queue:Queue) extend
   def created(seq:Long, delivery:Delivery) = {
     this.seq = seq
     this.value = new Loaded(delivery)
+
+    (browsing:::competing).foreach { sub => sub.addPrefetch(this) }
     this
   }
 
@@ -530,8 +559,19 @@ class QueueEntry(val queue:Queue) extend
   }
 
   def tombstone = {
+
+    // remove from prefetch counters..
+    var cur = this;
+    while( prefetched > 0 ) {
+      if( cur.hasSubs ) {
+        (cur.browsing ::: cur.competing).foreach { sub => if( sub.prefetched(cur) ) { sub.removePrefetch(cur) } }
+      }
+      cur = cur.getPrevious
+    }
+
+
     this.value = new Tombstone()
-    if( seq != -1L ) {
+    if( seq != 0L ) {
 
       def merge(lv:QueueEntry, rv:QueueEntry):Boolean = {
         if( lv==null || rv==null) {
@@ -639,8 +679,8 @@ class QueueEntry(val queue:Queue) extend
     def asFlushed:Flushed = null
     def asLoaded:Loaded = null
 
-    def flush:Int = 0
-    def load:Int = 0
+    def flush = {}
+    def load = {}
     def isFlushedOrFlushing = false
   }
 
@@ -661,7 +701,9 @@ class QueueEntry(val queue:Queue) extend
       competing = Nil
       p
     }
-    
+
+    override  def toString = { "ts:{ count: "+count+"}" }
+
   }
 
   class Flushed(val ref:Long, val size:Int) extends EntryType {
@@ -672,11 +714,13 @@ class QueueEntry(val queue:Queue) extend
 
     override def isFlushedOrFlushing = true
 
+    override def toString = { "flushed:{ loading: "+loading+", size:"+size+"}" }
+
     // Flushed entries can't be dispatched until
     // they get loaded.
     def dispatch():QueueEntry = {
       if( !loading ) {
-        var remaining = queue.tune_subscription_prefetch - size
+        var remaining = queue.tune_subscription_buffer - size
         load
 
         // make sure the next few entries are loaded too..
@@ -694,12 +738,11 @@ class QueueEntry(val queue:Queue) extend
       null
     }
 
-    override def load():Int = {
-      if( loading ) {
-        0
-      } else {
+    override def load() = {
+      if( !loading ) {
         // start loading it back...
         loading = true
+        queue.loadingSize += size
         queue.host.store.loadMessage(ref) { delivery =>
           // pass off to a source so it can aggregate multiple
           // loads to reduce cross thread synchronization
@@ -707,7 +750,6 @@ class QueueEntry(val queue:Queue) extend
             queue.store_load_source.merge((QueueEntry.this, delivery.get))
           }
         }
-        size
       }
     }
   }
@@ -719,6 +761,8 @@ class QueueEntry(val queue:Queue) extend
     def size = delivery.size
     var flushing = false
     
+    override def toString = { "loaded:{ flushing: "+flushing+", aquired: "+aquired+", size:"+size+"}" }
+
     override def isFlushedOrFlushing = {
       flushing
     }
@@ -734,11 +778,10 @@ class QueueEntry(val queue:Queue) extend
       }
     }
 
-    override def flush():Int = {
-      if( flushing ) {
-        0
-      } else {
+    override def flush():Unit = {
+      if( !flushing ) {
         flushing=true
+        queue.flushingSize+=size
 
         if( delivery.storeBatch!=null ) {
           delivery.storeBatch.eagerFlush(^{
@@ -750,14 +793,12 @@ class QueueEntry(val queue:Queue) extend
             queue.store_flush_source.merge(QueueEntry.this)
           }
         }
-
-        size
       }
     }
 
     def dispatch():QueueEntry = {
       if( delivery==null ) {
-        // can't dispatch untill the delivery is set.
+        // can't dispatch until the delivery is set.
         null
       } else {
 
@@ -829,6 +870,14 @@ class QueueEntry(val queue:Queue) extend
           val p = nextEntry
           p.addBrowsing(browsingFastSubs)
           p.addCompeting(competingFastSubs)
+
+
+          // if we are no longer needed and we are under pressure to make room and the previous was flushed....
+          if( !hasSubs && prefetched==0 && !aquired && queue.messages.full && getPrevious.isFlushedOrFlushing ) {
+            // then flush out to make space...
+            flush
+          }
+
           p
         } else {
           null
@@ -850,14 +899,89 @@ class Subscription(queue:Queue) extends 
   var session: DeliverySession = null
   var pos:QueueEntry = null
 
+  var cursoredCounter = 0L
+
+  // Vars used to detect slow consumers.
+  var prevCursoredCounter = 0L
+  var tailParkings = 0
+  var slowIntervals = 0
+
+  def slow = slowIntervals > queue.tune_max_slow_intervals
+
+  var lastPrefetchPos:QueueEntry = null
+  var prefetchSize = 0
+
+
+  override def toString = "{ prefetchSize: "+prefetchSize+", pos: "+(if(pos==null) null else pos.seq)+" lastPrefetchPos: "+(if(lastPrefetchPos==null) null else lastPrefetchPos.seq)+" }"
+
   def position(value:QueueEntry):Unit = {
+    if( value!=null ) {
+      // setting a new position..
+      if( pos!=null && pos.value!=null ) {
+        // Remove the previous pos from the prefetch counters.
+        removePrefetch(pos)
+        cursoredCounter += pos.size
+      }
+    } else {
+      // setting null pos, happens when the sub is closed.
+      if( lastPrefetchPos!=null ) {
+        var cur = pos
+
+        // clean up it's prefetch counters on the entries..
+        while( cur!=null && cur.value!=null ) {
+          cur.prefetched -= 1
+          cur = if( cur == lastPrefetchPos ) {
+            null
+          } else {
+            cur.nextEntry
+          }
+        }
+        lastPrefetchPos = null
+        prefetchSize=0
+      }
+    }
     pos = value
     session.refiller = pos
+    if( tailParked ) {
+      tailParkings += 1
+    }
+  }
+
+
+  def prefetched(value:QueueEntry) = {
+    pos.seq <= value.seq && value.seq <= lastPrefetchPos.seq
   }
 
+  def removePrefetch(value:QueueEntry):Unit = {
+//    println("prefetch rm: "+value.seq)
+    value.prefetched -= 1
+    prefetchSize -= value.size
+    fillPrefetch()
+  }
+
+  def fillPrefetch() = {
+    // attempts to fill the prefetch...
+    var next = lastPrefetchPos.getNext
+    while(prefetchSize < queue.tune_subscription_buffer && next!=null && next.value!=null ) {
+      next.load
+      addPrefetch(next)
+      next = next.getNext
+    }
+  }
+
+  def addPrefetch(value:QueueEntry):Unit = {
+//    println("prefetch add: "+value.seq)
+    prefetchSize += value.size
+    lastPrefetchPos = value
+    value.prefetched += 1
+  }
+
+  def tailParked = pos eq queue.tailEntry
+
   def connect(consumer: DeliveryConsumer) = {
     session = consumer.connect(this)
     queue.headEntry.addCompeting(this :: Nil)
+    addPrefetch(queue.headEntry)
     queue.dispatchQueue << queue.headEntry
   }