You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2010/07/07 06:10:41 UTC

svn commit: r961145 - in /activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker: Queue.scala VirtualHost.scala

Author: chirino
Date: Wed Jul  7 04:10:40 2010
New Revision: 961145

URL: http://svn.apache.org/viewvc?rev=961145&view=rev
Log:
making queue consumers more decoupled.

Modified:
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=961145&r1=961144&r2=961145&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala Wed Jul  7 04:10:40 2010
@@ -63,6 +63,7 @@ class Queue(val host: VirtualHost, val d
   import Queue._
 
   var consumerSubs = Map[DeliveryConsumer, Subscription]()
+  var fastSubs = List[Subscription]()
 
   override val dispatchQueue: DispatchQueue = createQueue(destination.toString);
   dispatchQueue.setTargetQueue(getRandomThreadQueue)
@@ -93,8 +94,8 @@ class Queue(val host: VirtualHost, val d
   // sequence numbers.. used to track what's in the store.
   var message_seq_counter = 1L
 
-  val headEntry = new QueueEntry(this).tombstone
-  var tailEntry = new QueueEntry(this)
+  val headEntry = new QueueEntry(this, 0L).tombstone
+  var tailEntry = new QueueEntry(this, next_message_seq)
 
   var counter = 0
   val entries = new LinkedNodeList[QueueEntry]()
@@ -149,7 +150,7 @@ class Queue(val host: VirtualHost, val d
     this.storeId = storeId
     if( !records.isEmpty ) {
       records.foreach { qer =>
-        val entry = new QueueEntry(Queue.this).flushed(qer)
+        val entry = new QueueEntry(Queue.this,qer.queueSeq).flushed(qer)
         entries.addLast(entry)
       }
 
@@ -180,9 +181,9 @@ class Queue(val host: VirtualHost, val d
       } else {
 
         val entry = tailEntry
-        tailEntry = new QueueEntry(Queue.this)
+        tailEntry = new QueueEntry(Queue.this, next_message_seq)
         val queueDelivery = delivery.copy
-        entry.created(next_message_seq, queueDelivery)
+        entry.created(queueDelivery)
         queueDelivery.storeBatch = delivery.storeBatch
 
         size += entry.size
@@ -196,42 +197,26 @@ class Queue(val host: VirtualHost, val d
           queueDelivery.storeBatch.enqueue(entry.createQueueEntryRecord)
         }
 
-        // do we have at least 1 subscription that is keeping up with the producers
-        // and is interested in this message?
-//        val hold = consumerSubs.valuesIterator.find( sub=> !sub.slow && sub.matches(delivery) ).isDefined
-        def haveQuickConsumer = consumerSubs.valuesIterator.find( sub=> !sub.slow ).isDefined
+//        var haveQuickConsumer = false
+//        fastSubs.foreach{ sub=>
+//          if( sub.pos.seq < entry.seq ) {
+//            haveQuickConsumer = true
+//          }
+//        }
+
+        def haveQuickConsumer = fastSubs.find( sub=> !sub.slow && sub.pos.seq <= entry.seq ).isDefined
 
         var dispatched = false
         if( entry.prefetched > 0 || haveQuickConsumer ) {
           // try to dispatch it directly...
-//          println("hold: "+delivery.message.getProperty("color"))
           entry.dispatch
-
         } else {
 //          println("flush: "+delivery.message.getProperty("color"))
           // we flush the entry out right away if it looks
           // it wont be needed.
           entry.flush
-          if( full ) {
-            println("full... waiting for flushes");
-          }
-
-          // just make it hit the disk quick.. but keep it in memory.
-          // delivery.storeBatch.eagerFlush(^{})
         }
 
-//        // Does it look like we need to start swapping to make room
-//        // for more messages?
-//        if( !dispatched && host.store!=null && full ) {
-//          val wasAt = dequeue_size
-//          dispatchQueue.dispatchAfter(tune_slow_check_interval, TimeUnit.MILLISECONDS, ^{
-//            // start swapping if was still blocked after a short delay
-//            if( dequeue_size == wasAt && full ) {
-//              swap
-//            }
-//          })
-//        }
-
         // release the store batch...
         if (queueDelivery.storeBatch != null) {
           queueDelivery.storeBatch.release
@@ -253,6 +238,21 @@ class Queue(val host: VirtualHost, val d
 
         var idleConsumerCount = 0
 
+
+//        if( consumerSubs.isEmpty ) {
+//          println("using "+size+" out of "+capacity+" buffer space.");
+//          var cur = entries.getHead
+//          while( cur!=null ) {
+//            if( cur.asLoaded!=null ) {
+//              println("  => "+cur)
+//            }
+//            cur = cur.getNext
+//          }
+//          println("tail: "+tailEntry)
+//        }
+
+        fastSubs = Nil
+
         consumerSubs.foreach{ case (consumer, sub)=>
 
           // Skip over new consumers...
@@ -285,6 +285,10 @@ class Queue(val host: VirtualHost, val d
 
             sub.tailParkings = 0
           }
+
+          if( !sub.slow ) {
+            fastSubs ::= sub
+          }
         }
 
         // Trigger a swap if we have slow consumers and we are full..
@@ -399,6 +403,7 @@ class Queue(val host: VirtualHost, val d
       val subscription = new Subscription(this)
       subscription.connect(consumer)
       consumerSubs += consumer -> subscription
+      fastSubs ::= subscription
       addCapacity( tune_subscription_buffer )
     }
   } >>: dispatchQueue
@@ -409,9 +414,11 @@ class Queue(val host: VirtualHost, val d
         case Some(cs) =>
           cs.close
           consumerSubs -= consumer
+          fastSubs = fastSubs.filterNot(_ eq cs)
           addCapacity( -tune_subscription_buffer )
         case None =>
       }
+
     }
   } >>: dispatchQueue
 
@@ -441,16 +448,19 @@ class Queue(val host: VirtualHost, val d
     }
     
     debug("swapping...")
+
     var entry = entries.getHead
     while( entry!=null ) {
-      println(entries)
       if( entry.asTombstone == null ) {
 
-        // only keep prefetch entries around..
-        if( entry.prefetched == 0 ) {
-          entry.flush
-        } else {
+        val loaded = entry.asLoaded
+
+        // Keep around prefetched and loaded entries.
+        if( entry.prefetched < 0 || (loaded!=null && loaded.aquired)) {
           entry.load
+        } else {
+          // flush the the others out of memory.
+          entry.flush
         }
       }
 
@@ -494,7 +504,6 @@ class Queue(val host: VirtualHost, val d
         entry.flushed
       }
     }
-    println("flushes done... full: "+messages.full);
     messages.refiller.run
 
   }
@@ -506,15 +515,14 @@ object QueueEntry extends Sizer[QueueEnt
   def size(value: QueueEntry): Int = value.size
 }
 
-class QueueEntry(val queue:Queue) extends LinkedNode[QueueEntry] with Comparable[QueueEntry] with Runnable {
+class QueueEntry(val queue:Queue, val seq:Long) extends LinkedNode[QueueEntry] with Comparable[QueueEntry] with Runnable {
   import QueueEntry._
 
-  var seq: Long = 0L
   var competing:List[Subscription] = Nil
   var browsing:List[Subscription] = Nil
   var prefetched = 0
 
-  var value:EntryType = null
+  var value:EntryType = new Tail
 
   override def toString = {
     "{seq: "+seq+", prefetched: "+prefetched+", value: "+value+", competing: "+competing+", browsing: "+browsing+"}"
@@ -533,11 +541,9 @@ class QueueEntry(val queue:Queue) extend
     (seq - o.seq).toInt
   }
 
-  def created(seq:Long, delivery:Delivery) = {
-    this.seq = seq
-    this.value = new Loaded(delivery)
 
-    (browsing:::competing).foreach { sub => sub.addPrefetch(this) }
+  def created(delivery:Delivery) = {
+    this.value = new Loaded(delivery)
     this
   }
 
@@ -553,7 +559,6 @@ class QueueEntry(val queue:Queue) extend
   }
 
   def flushed(qer:QueueEntryRecord) = {
-    this.seq = qer.queueSeq
     this.value = new Flushed(qer.messageKey, qer.size)
     this
   }
@@ -569,23 +574,23 @@ class QueueEntry(val queue:Queue) extend
       cur = cur.getPrevious
     }
 
-
     this.value = new Tombstone()
     if( seq != 0L ) {
 
-      def merge(lv:QueueEntry, rv:QueueEntry):Boolean = {
+      def merge(lv:QueueEntry, rv:QueueEntry):Unit = {
         if( lv==null || rv==null) {
-          return false
+          return
         }
 
         val lts = lv.value.asTombstone
         val rts = rv.value.asTombstone
 
         if( lts==null ||  rts==null ) {
-          return false
+          return
         }
 
         if( lv.seq + lts.count  == rv.seq ) {
+
           lts.count += rts.count
           rts.count = 0
 
@@ -596,19 +601,13 @@ class QueueEntry(val queue:Queue) extend
             rv.competing = Nil
           }
 
-          return true
-        } else {
-          return false
+          rv.unlink
         }
       }
 
       // Merge adjacent tombstones
-      if( merge(this, getNext) ) {
-        getNext.unlink
-      }
-      if( merge(getPrevious, this) ) {
-        this.unlink
-      }
+      merge(this, getNext)
+      merge(getPrevious, this)
     }
     this
   }
@@ -642,7 +641,7 @@ class QueueEntry(val queue:Queue) extend
     competing = competing.filterNot(_ == s)
   }
 
-  def nextEntry():QueueEntry = {
+  def nextOrTail():QueueEntry = {
     var entry = getNext
     if (entry == null) {
       entry = queue.tailEntry
@@ -658,32 +657,34 @@ class QueueEntry(val queue:Queue) extend
   def asTombstone = this.value.asTombstone
   def asFlushed = this.value.asFlushed
   def asLoaded = this.value.asLoaded
+  def asTail = this.value.asTail
   def isFlushedOrFlushing = value.isFlushedOrFlushing
 
-  def dispatch():QueueEntry = {
-    if( value == null ) {
-      // tail entry can't be dispatched.
-      null
-    } else {
-      value.dispatch
-    }
-  }
-
+  def dispatch():QueueEntry = value.dispatch
 
   trait EntryType {
     def size:Int
     def dispatch():QueueEntry
     def ref:Long
 
-    def asTombstone:Tombstone = null
-    def asFlushed:Flushed = null
+    def asTail:Tail = null
     def asLoaded:Loaded = null
+    def asFlushed:Flushed = null
+    def asTombstone:Tombstone = null
 
     def flush = {}
     def load = {}
     def isFlushedOrFlushing = false
   }
 
+  class Tail extends EntryType {
+    override def asTail:Tail = this
+    def size = 0
+    def ref = -1
+
+    def dispatch():QueueEntry = null
+  }
+
   class Tombstone extends EntryType {
 
     var count = 1L
@@ -694,7 +695,7 @@ class QueueEntry(val queue:Queue) extend
     override def asTombstone = this
 
     def dispatch():QueueEntry = {
-      val p = nextEntry
+      val p = nextOrTail
       p.addBrowsing(browsing)
       p.addCompeting(competing)
       browsing = Nil
@@ -779,7 +780,7 @@ class QueueEntry(val queue:Queue) extend
     }
 
     override def flush():Unit = {
-      if( !flushing ) {
+      if( queue.host.store!=null && !flushing ) {
         flushing=true
         queue.flushingSize+=size
 
@@ -867,13 +868,13 @@ class QueueEntry(val queue:Queue) extend
 
         // the fast subs move on to the next entry...
         if ( browsingFastSubs!=null &&  competingFastSubs!=null) {
-          val p = nextEntry
+          val p = nextOrTail
           p.addBrowsing(browsingFastSubs)
           p.addCompeting(competingFastSubs)
 
-
-          // if we are no longer needed and we are under pressure to make room and the previous was flushed....
-          if( !hasSubs && prefetched==0 && !aquired && queue.messages.full && getPrevious.isFlushedOrFlushing ) {
+          // flush this entry out if it's not going to be needed soon.
+          def haveQuickConsumer = queue.fastSubs.find( sub=> !sub.slow && sub.pos.seq <= seq ).isDefined
+          if( !hasSubs && prefetched==0 && !aquired && !haveQuickConsumer ) {
             // then flush out to make space...
             flush
           }
@@ -891,7 +892,8 @@ class QueueEntry(val queue:Queue) extend
 
 class LinkedQueueEntry(val value:QueueEntry) extends LinkedNode[LinkedQueueEntry]
 
-class Subscription(queue:Queue) extends DeliveryProducer {
+class Subscription(queue:Queue) extends DeliveryProducer with DispatchLogging {
+  override protected def log = Queue
 
   def dispatchQueue = queue.dispatchQueue
 
@@ -908,36 +910,28 @@ class Subscription(queue:Queue) extends 
 
   def slow = slowIntervals > queue.tune_max_slow_intervals
 
-  var lastPrefetchPos:QueueEntry = null
+  var nextPrefetchPos:QueueEntry = null
   var prefetchSize = 0
 
 
-  override def toString = "{ prefetchSize: "+prefetchSize+", pos: "+(if(pos==null) null else pos.seq)+" lastPrefetchPos: "+(if(lastPrefetchPos==null) null else lastPrefetchPos.seq)+" }"
+  override def toString = "{ prefetchSize: "+prefetchSize+", pos: "+(if(pos==null) null else pos.seq)+" nextPrefetchPos: "+(if(nextPrefetchPos==null) null else nextPrefetchPos.seq)+" }"
 
   def position(value:QueueEntry):Unit = {
     if( value!=null ) {
       // setting a new position..
-      if( pos!=null && pos.value!=null ) {
+      if( pos!=null ) {
         // Remove the previous pos from the prefetch counters.
         removePrefetch(pos)
         cursoredCounter += pos.size
       }
     } else {
       // setting null pos, happens when the sub is closed.
-      if( lastPrefetchPos!=null ) {
-        var cur = pos
+      var cur = pos
 
-        // clean up it's prefetch counters on the entries..
-        while( cur!=null && cur.value!=null ) {
-          cur.prefetched -= 1
-          cur = if( cur == lastPrefetchPos ) {
-            null
-          } else {
-            cur.nextEntry
-          }
-        }
-        lastPrefetchPos = null
-        prefetchSize=0
+      // clean up it's prefetch counters on the entries..
+      while( cur!=nextPrefetchPos ) {
+        cur.prefetched -= 1
+        cur = cur.nextOrTail
       }
     }
     pos = value
@@ -949,11 +943,11 @@ class Subscription(queue:Queue) extends 
 
 
   def prefetched(value:QueueEntry) = {
-    pos.seq <= value.seq && value.seq <= lastPrefetchPos.seq
+    pos.seq <= value.seq && value.seq < nextPrefetchPos.seq
   }
 
   def removePrefetch(value:QueueEntry):Unit = {
-//    println("prefetch rm: "+value.seq)
+//    trace("prefetch rm: "+value.seq)
     value.prefetched -= 1
     prefetchSize -= value.size
     fillPrefetch()
@@ -961,27 +955,25 @@ class Subscription(queue:Queue) extends 
 
   def fillPrefetch() = {
     // attempts to fill the prefetch...
-    var next = lastPrefetchPos.getNext
-    while(prefetchSize < queue.tune_subscription_buffer && next!=null && next.value!=null ) {
-      next.load
-      addPrefetch(next)
-      next = next.getNext
+    while(prefetchSize < queue.tune_subscription_buffer && nextPrefetchPos.asTail==null ) {
+      addPrefetch(nextPrefetchPos)
     }
   }
 
   def addPrefetch(value:QueueEntry):Unit = {
-//    println("prefetch add: "+value.seq)
+//    trace("prefetch add: "+value.seq)
     prefetchSize += value.size
-    lastPrefetchPos = value
     value.prefetched += 1
+    value.load
+    nextPrefetchPos = value.nextOrTail
   }
 
   def tailParked = pos eq queue.tailEntry
 
   def connect(consumer: DeliveryConsumer) = {
     session = consumer.connect(this)
-    queue.headEntry.addCompeting(this :: Nil)
     addPrefetch(queue.headEntry)
+    queue.headEntry.addCompeting(this :: Nil)
     queue.dispatchQueue << queue.headEntry
   }
 

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala?rev=961145&r1=961144&r2=961145&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala Wed Jul  7 04:10:40 2010
@@ -126,9 +126,8 @@ class VirtualHost(val broker: Broker) ex
   override protected def _start(onCompleted:Runnable):Unit = {
     val tracker = new LoggingTracker("virtual host startup", dispatchQueue)
     store = StoreFactory.create(config.store)
-    
-    store.configure(config.store, this)
     if( store!=null ) {
+      store.configure(config.store, this)
       val task = tracker.task("store startup")
       store.start(^{
         if( config.purgeOnStartup ) {