You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2010/07/07 05:59:34 UTC

svn commit: r961116 - in /activemq/sandbox/activemq-apollo-actor: activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/ activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/

Author: chirino
Date: Wed Jul  7 03:59:33 2010
New Revision: 961116

URL: http://svn.apache.org/viewvc?rev=961116&view=rev
Log:
making progress towards paging of messages.

Modified:
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
    activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala?rev=961116&r1=961115&r2=961116&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala Wed Jul  7 03:59:33 2010
@@ -28,7 +28,7 @@ import org.fusesource.hawtbuf._
 trait DeliveryProducer {
 
   def dispatchQueue:DispatchQueue
-  def ack(message:Delivery) = {}
+  def ack(value:Any) = {}
 
   def collocate(value:DispatchQueue):Unit = {
     if( value.getTargetQueue ne dispatchQueue.getTargetQueue ) {
@@ -152,9 +152,9 @@ class Delivery extends BaseRetained {
   def copy() = (new Delivery).set(this)
 
   /**
-   * Does the producer require this message delivery to be ack?
+   * Set if the producer requires an ack to be sent back
    */
-  var ack = false
+  var ack:Any = null
 
   def set(other:Delivery) = {
     size = other.size

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=961116&r1=961115&r2=961116&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala Wed Jul  7 03:59:33 2010
@@ -17,10 +17,13 @@
 package org.apache.activemq.apollo.broker
 
 import _root_.org.fusesource.hawtdispatch.ScalaDispatch._
-import java.util.{LinkedList}
 import org.apache.activemq.util.TreeMap
 import collection.{SortedMap}
 import org.fusesource.hawtdispatch.{ScalaDispatch, DispatchQueue, BaseRetained}
+import org.apache.activemq.util.TreeMap.TreeEntry
+import java.util.{Collections, ArrayList, LinkedList}
+import org.apache.activemq.util.list.LinkedNode
+import org.apache.activemq.util.list.LinkedNodeList
 
 /**
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
@@ -45,7 +48,6 @@ trait QueueLifecyleListener {
 
 
 object Queue extends Log {
-  val maxOutboundSize = 1024 * 1204 * 5
 }
 
 /**
@@ -75,93 +77,154 @@ class Queue(val host: VirtualHost, val d
   var message_seq_counter = 0L
   var count = 0
 
-  var maxSize: Int = 1024 * 32
 
-  val headEntry = new PagedEntry(this)
-  headEntry.seq = -1
-  var tailEntry = new PagedEntry(this)
-
-  object messages extends Sink[PagedEntry] {
-    var counter = 0
-    val entries = new TreeMap[Long, PagedEntry]()
-    entries.put(headEntry.seq, headEntry)
+  val headEntry = new QueueEntry(this).tombstone
+  var tailEntry = new QueueEntry(this)
 
-    private var size = 0
+  var counter = 0
+  val entries = new LinkedNodeList[QueueEntry]()
+  entries.addFirst(headEntry)
+
+  /**
+   * Tunning options.
+   */
+  var tune_max_size = 1024 * 32
+  var tune_subscription_prefetch = 1024*32
+  var tune_max_outbound_size = 1024 * 1204 * 5
+
+  private var size = 0
+
+  def swap() = {
+
+    class Prio(val entry:QueueEntry) extends Comparable[Prio] {
+      var value = 0
+      def compareTo(o: Prio) = o.value - value
+    }
+
+    val prios = new ArrayList[Prio](count)
+
+    var entry = entries.getHead
+    while( entry!=null ) {
+      if( entry.value.asTombstone == null ) {
+        prios.add(new Prio(entry))
+      }
+      entry = entry.getNext
+    }
+
+
+    /**
+     * adds keep priority to the range of entries starting at x
+     * and spanning the size provided.
+     */
+    def prioritize(i:Int, size:Int, p:Int):Unit = {
+      val prio = prios.get(i)
+      prio.value += p
+      val remainingSize = size - prio.entry.value.size
+      if( remainingSize > 0 ) {
+        val next = i + 1
+        if( next < prios.size ) {
+          prioritize(next, remainingSize, p-1)
+        }
+      }
+    }
+
+    // Prioritize the entries so that higher priority entries are swapped in,
+    // and lower priority entries are swapped out.
+    var i = 0
+    while( i < prios.size ) {
+      val prio = prios.get(i)
+      if( prio.entry.hasSubs ) {
+
+        var credits =0;
+        if( prio.entry.competing != Nil) {
+          credits += prio.entry.competing.size * tune_subscription_prefetch
+        } else{
+          if( prio.entry.browsing != Nil ) {
+            credits += tune_subscription_prefetch
+          }
+        }
+        prioritize(i, credits, 1000)
+
+      }
+      i += 1
+    }
+
+    Collections.sort(prios)
+
+    var remaining = tune_max_size / 2
+    i = 0
+    while( i < prios.size ) {
+      val prio = prios.get(i)
+      val entry = prio.entry
+      if( remaining > 0 ) {
+        remaining -= entry.value.size
+      }
+
+      i += 1
+    }
+
+  }
+
+  object messages extends Sink[QueueEntry] {
     var refiller: Runnable = null
 
-    def full = size >= maxSize
+    def full = size >= tune_max_size
 
-    def offer(value: PagedEntry): Boolean = {
+    def offer(value: QueueEntry): Boolean = {
 
       if (full) {
         false
       } else {
 
-        val ref = value.delivery.ref
+        val ref = value.value.ref
         if (ref != null) {
           host.database.addMessageToQueue(storeId, value.seq, ref)
           ref.release
         }
 
-        size += value.delivery.size
-        entries.put(value.seq, value)
+        size += value.value.size
+        entries.addLast(value)
         counter += 1;
 
-        if( !value.isEmpty ) {
+//        if( full ) {
+//          swap
+//        }
+
+        if( value.hasSubs ) {
           value.dispatch
         }
         true
       }
     }
+  }
 
-    def ack(value: PagedEntry) = {
-
-      if (value.delivery.ref != null) {
-        host.database.removeMessageFromQueue(storeId, value.seq, null)
-      }
-
-      counter -= 1
-      size -= value.delivery.size
-
-      value.delivery = null
-
-      // acked entries turn into a tombstone entry..  adjacent tombstones
-      // aggregate into a single entry.
-      var current = entries.getEntry(value.seq)
-      assert(current != null)
+  def ack(entry: QueueEntry) = {
 
-      // Merge /w previous if possible
-      var adj = current.previous
-      if (adj.getValue.mergeTomestone(current.getValue)) {
-        entries.removeEntry(current)
-        current = adj
-      }
+    if (entry.value.ref != null) {
+      host.database.removeMessageFromQueue(storeId, entry.seq, null)
+    }
 
-      // Merge /w next if possible
-      adj = current.next
-      if (adj != null && current.getValue.mergeTomestone(adj.getValue)) {
-        entries.removeEntry(adj)
-      }
+    counter -= 1
+    size -= entry.value.size
 
+    entry.tombstone
 
-      if (counter == 0) {
-        refiller.run
-      }
+    if (counter == 0) {
+      messages.refiller.run
     }
+  }
 
 
-    def nack(values: List[PagedEntry]) = {
-      for (v <- values) {
-        v.unaquire;
-        // TODO: re-dispatch em.
-      }
+  def nack(values: List[QueueEntry]) = {
+    // TODO:
+    for (v <- values) {
     }
-
   }
+  
 
   val session_manager = new SinkMux[Delivery](MapSink(messages) {x => accept(x)}, dispatchQueue, Delivery)
 
-  val ack_source = createSource(new ListEventAggregator[(Subscription, Delivery)](), dispatchQueue)
+  val ack_source = createSource(new ListEventAggregator[(Subscription, QueueEntry)](), dispatchQueue)
   ack_source.setEventHandler(^ {drain_acks});
   ack_source.resume
 
@@ -269,13 +332,9 @@ class Queue(val host: VirtualHost, val d
    * know the order.  Converts the delivery to a QueueEntry
    */
   private def accept(delivery: Delivery) = {
-    val d = delivery.copy
-    d.ack = true
     val rc = tailEntry
-    tailEntry = new PagedEntry(this)
-    rc.seq = next_message_seq
-    rc.delivery = d
-    rc
+    tailEntry = new QueueEntry(this)
+    rc.loaded(next_message_seq, delivery)
   }
 
 
@@ -288,58 +347,75 @@ class Queue(val host: VirtualHost, val d
 
 }
 
-object PagedEntry extends Sizer[PagedEntry] {
-  def size(value: PagedEntry): Int = value.delivery.size
 
+object QueueEntry extends Sizer[QueueEntry] {
+  def size(value: QueueEntry): Int = value.value.size
 }
-class PagedEntry(val queue:Queue) extends Comparable[PagedEntry] with Runnable {
-  def compareTo(o: PagedEntry) = {
-    (seq - o.seq).toInt
-  }
 
-  var delivery: Delivery = null
-  var seq: Long = -1
-  var count: Long = 1
-  var aquired = false
+class QueueEntry(val queue:Queue) extends LinkedNode[QueueEntry] with Comparable[QueueEntry] with Runnable {
+  import QueueEntry._
+
+  var seq: Long = -1L
   var competing:List[Subscription] = Nil
   var browsing:List[Subscription] = Nil
+  var value:EntryType = null
 
-  def aquire() = {
-    if (aquired) {
-      false
-    } else {
-      aquired = true
-      true
-    }
+  def compareTo(o: QueueEntry) = {
+    (seq - o.seq).toInt
   }
 
-  def unaquire() = {
-    assert(aquired)
-    aquired = false
+  def loaded(seq:Long, delivery:Delivery) = {
+    this.seq = seq
+    this.value = new Loaded(delivery)
+    this
   }
 
+  def tombstone = {
+    this.value = new Tombstone()
+    if( seq != -1L ) {
+
+      def merge(lv:QueueEntry, rv:QueueEntry):Boolean = {
+        if( lv==null || rv==null) {
+          return false
+        }
+
+        val lts = lv.value.asTombstone
+        val rts = rv.value.asTombstone
 
-  def mergeTomestone(next: PagedEntry): Boolean = {
-    if (tomestone && next.tomestone && seq + count == next.seq) {
-      count += next.count
-      if( next.browsing!=Nil || next.competing!=Nil ){
-        addBrowsing(next.browsing)
-        addCompeting(next.competing)
-        next.browsing = Nil
-        next.competing = Nil
+        if( lts==null ||  rts==null ) {
+          return false
+        }
+
+        if( lv.seq + lts.count  == rv.seq ) {
+          lts.count += rts.count
+          rts.count = 0
+
+          if( rv.browsing!=Nil || rv.competing!=Nil ){
+            lv.addBrowsing(rv.browsing)
+            lv.addCompeting(rv.competing)
+            rv.browsing = Nil
+            rv.competing = Nil
+          }
+
+          return true
+        } else {
+          return false
+        }
       }
-      true
-    } else {
-      false
-    }
-  }
 
-  def tomestone = {
-    delivery == null
+      // Merge adjacent tombstones
+      if( merge(this, getNext) ) {
+        getNext.unlink
+      }
+      if( merge(getPrevious, this) ) {
+        this.unlink
+      }
+    }
+    this
   }
 
-  def isEmpty = competing == Nil && browsing == Nil
 
+  def hasSubs = !(competing == Nil && browsing == Nil)
 
   def run() = {
     var next = dispatch()
@@ -348,90 +424,12 @@ class PagedEntry(val queue:Queue) extend
     }
   }
 
-  def dispatch():PagedEntry = {
-
-    if( this == queue.tailEntry ) {
-
-      // The tail entry does not hold data..
+  def dispatch():QueueEntry = {
+    if( value == null ) {
+      // tail entry can't be dispatched.
       null
-
-    } else if( this == queue.headEntry ) {
-
-      // The head entry does not hold any data.. so just move
-      // any assigned subs to the next entry.
-      
-      val p = nextEntry
-      p.addBrowsing(browsing)
-      p.addCompeting(competing)
-      browsing = Nil
-      competing = Nil
-      p
-
     } else {
-
-      var browsingSlowSubs:List[Subscription] = Nil
-      var browsingFastSubs:List[Subscription] = Nil
-      var competingSlowSubs:List[Subscription] = Nil
-      var competingFastSubs:List[Subscription] = Nil
-
-      if( browsing!=Nil ) {
-        browsing.foreach { sub =>
-          if (sub.matches(this)) {
-            if (sub.offer(this)) {
-              browsingFastSubs ::= sub
-            } else {
-              browsingSlowSubs ::= sub
-            }
-          } else {
-            browsingFastSubs ::= sub
-          }
-        }
-      }
-
-      if( competing!=Nil ) {
-        if (!this.aquired) {
-          this.aquire()
-
-          var picked: Subscription = null
-          var remaining = competing
-          while( remaining!=Nil && picked == null ) {
-            val sub = remaining.head
-            remaining = remaining.drop(1)
-
-            if (sub.matches(this)) {
-              competingSlowSubs = competingSlowSubs ::: sub :: Nil
-              if (sub.offer(this)) {
-                picked = sub
-              }
-            } else {
-              competingFastSubs = competingFastSubs ::: sub :: Nil
-            }
-          }
-
-          if (picked == null) {
-            this.unaquire()
-          } else {
-            competingFastSubs = remaining ::: competingFastSubs ::: competingSlowSubs
-            competingSlowSubs = Nil
-          }
-        } else {
-          competingFastSubs = competing
-        }
-      }
-
-      // The slow subs stay on this entry..
-      browsing = browsingSlowSubs
-      competing = competingSlowSubs
-
-      // the fast subs move on to the next entry...
-      if ( browsingFastSubs!=null &&  competingFastSubs!=null) {
-        val p = nextEntry
-        p.addBrowsing(browsingFastSubs)
-        p.addCompeting(competingFastSubs)
-        p
-      } else {
-        null
-      }
+      value.dispatch
     }
   }
 
@@ -455,25 +453,160 @@ class PagedEntry(val queue:Queue) extend
     competing = competing.filterNot(_ == s)
   }
 
-  def nextEntry():PagedEntry = {
-    var entry = queue.messages.entries.get(this.seq + 1)
+  def nextEntry():QueueEntry = {
+    var entry = getNext
     if (entry == null) {
       entry = queue.tailEntry
     }
     entry
   }
 
+
+  trait EntryType {
+    def size:Int
+    def dispatch():QueueEntry
+    def ref:StoredMessageRef
+
+    def asTombstone:Tombstone = null
+    def asStored:Stored = null
+    def asLoaded:Loaded = null
+  }
+
+  class Tombstone extends EntryType {
+
+    var count = 1L
+
+    def size = 0
+    def ref = null
+
+    override def asTombstone = this
+
+    def dispatch():QueueEntry = {
+      val p = nextEntry
+      p.addBrowsing(browsing)
+      p.addCompeting(competing)
+      browsing = Nil
+      competing = Nil
+      p
+    }
+    
+  }
+
+  class Stored extends EntryType {
+
+    private var loading = false
+
+    var ref:StoredMessageRef = null
+    var size = 0
+
+    override def asStored = this
+
+    // Stored entries can't be dispatched until
+    // they get loaded.
+    def dispatch():QueueEntry = {
+      null
+    }
+  }
+
+  class Loaded(val delivery: Delivery) extends EntryType {
+
+    var aquired = false
+    def ref = delivery.ref
+    def size = delivery.size
+    
+    override  def asLoaded = this
+
+    def dispatch():QueueEntry = {
+      if( delivery==null ) {
+        // can't dispatch untill the delivery is set.
+        null
+      } else {
+
+        var browsingSlowSubs:List[Subscription] = Nil
+        var browsingFastSubs:List[Subscription] = Nil
+        var competingSlowSubs:List[Subscription] = Nil
+        var competingFastSubs:List[Subscription] = Nil
+
+        if( browsing!=Nil ) {
+          val offering = delivery.copy
+          offering.ack = null
+
+          browsing.foreach { sub =>
+            if (sub.matches(offering)) {
+              if (sub.offer(offering)) {
+                browsingFastSubs ::= sub
+              } else {
+                browsingSlowSubs ::= sub
+              }
+            } else {
+              browsingFastSubs ::= sub
+            }
+          }
+        }
+
+        if( competing!=Nil ) {
+
+          val offering = delivery.copy
+          offering.ack = QueueEntry.this
+
+          if (!this.aquired) {
+            aquired = true
+
+            var picked: Subscription = null
+            var remaining = competing
+            while( remaining!=Nil && picked == null ) {
+              val sub = remaining.head
+              remaining = remaining.drop(1)
+
+              if (sub.matches(offering)) {
+                competingSlowSubs = competingSlowSubs ::: sub :: Nil
+                if (sub.offer(offering)) {
+                  picked = sub
+                }
+              } else {
+                competingFastSubs = competingFastSubs ::: sub :: Nil
+              }
+            }
+
+            if (picked == null) {
+              aquired = false
+            } else {
+              competingFastSubs = remaining ::: competingFastSubs ::: competingSlowSubs
+              competingSlowSubs = Nil
+            }
+          } else {
+            competingFastSubs = competing
+          }
+        }
+
+        // The slow subs stay on this entry..
+        browsing = browsingSlowSubs
+        competing = competingSlowSubs
+
+        // the fast subs move on to the next entry...
+        if ( browsingFastSubs!=null &&  competingFastSubs!=null) {
+          val p = nextEntry
+          p.addBrowsing(browsingFastSubs)
+          p.addCompeting(competingFastSubs)
+          p
+        } else {
+          null
+        }
+      }
+    }
+  }
+
 }
 
 class Subscription(queue:Queue) extends DeliveryProducer {
 
   def dispatchQueue = queue.dispatchQueue
 
-  var dispatched = List[PagedEntry]()
+  var dispatched = List[QueueEntry]()
   var session: Session = null
-  var pos:PagedEntry = null
+  var pos:QueueEntry = null
 
-  def position(value:PagedEntry):Unit = {
+  def position(value:QueueEntry):Unit = {
     pos = value
     session.refiller = pos
   }
@@ -488,14 +621,17 @@ class Subscription(queue:Queue) extends 
     pos.removeCompeting(this)
     session.close
     session = null
-    queue.messages.nack(dispatched)
+    queue.nack(dispatched)
   }
 
-  def matches(entry:PagedEntry) = session.consumer.matches(entry.delivery)
+  def matches(entry:Delivery) = session.consumer.matches(entry)
 
-  def offer(entry:PagedEntry) = {
-    if (session.offer(entry.delivery)) {
-      dispatched = dispatched ::: entry :: Nil
+  def offer(delivery:Delivery) = {
+    if (session.offer(delivery)) {
+      if( delivery.ack!=null ) {
+        val entry = delivery.ack.asInstanceOf[QueueEntry]
+        dispatched = dispatched ::: entry :: Nil
+      }
       true
     } else {
       false
@@ -504,25 +640,23 @@ class Subscription(queue:Queue) extends 
 
   // called from the consumer thread.. send it to the ack_source
   // do that it calls _ack from the queue thread.
-  override def ack(delivery: Delivery) = {
-    queue.ack_source.merge((this, delivery))
+  override def ack(value: Any) = {
+    val entry = value.asInstanceOf[QueueEntry]
+    queue.ack_source.merge((this, entry))
   }
 
-  def _ack(delivery: Delivery): Unit = {
+  def _ack(entry: QueueEntry): Unit = {
     assert(!dispatched.isEmpty)
-    val entry = if (dispatched.head.delivery == delivery) {
+    if (dispatched.head == entry) {
       // this should be the common case...
-      val rc = dispatched.head
       dispatched = dispatched.drop(1)
-      rc
     } else {
       // but lets also handle the case where we get an ack out of order.
-      val rc = dispatched.partition(_.delivery == delivery)
+      val rc = dispatched.partition(_ == entry)
       assert(rc._1.size == 1)
       dispatched = rc._2
-      rc._1.head
     }
-    queue.messages.ack(entry)
+    queue.ack(entry)
   }
 
 }

Modified: activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala?rev=961116&r1=961115&r2=961116&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala Wed Jul  7 03:59:33 2010
@@ -92,8 +92,8 @@ class StompProtocolHandler extends Proto
         if( session.full ) {
           false
         } else {
-          if( delivery.ack ) {
-            producer.ack(delivery)
+          if( delivery.ack!=null ) {
+            producer.ack(delivery.ack)
           }
           val frame = delivery.message.asInstanceOf[StompFrameMessage].frame
           val rc = session.offer(frame)