You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2010/07/07 05:59:34 UTC
svn commit: r961116 - in /activemq/sandbox/activemq-apollo-actor:
activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/
activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/
Author: chirino
Date: Wed Jul 7 03:59:33 2010
New Revision: 961116
URL: http://svn.apache.org/viewvc?rev=961116&view=rev
Log:
making progress towards paging of messages.
Modified:
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala
Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala?rev=961116&r1=961115&r2=961116&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala Wed Jul 7 03:59:33 2010
@@ -28,7 +28,7 @@ import org.fusesource.hawtbuf._
trait DeliveryProducer {
def dispatchQueue:DispatchQueue
- def ack(message:Delivery) = {}
+ def ack(value:Any) = {}
def collocate(value:DispatchQueue):Unit = {
if( value.getTargetQueue ne dispatchQueue.getTargetQueue ) {
@@ -152,9 +152,9 @@ class Delivery extends BaseRetained {
def copy() = (new Delivery).set(this)
/**
- * Does the producer require this message delivery to be ack?
+ * Set if the producer requires an ack to be sent back
*/
- var ack = false
+ var ack:Any = null
def set(other:Delivery) = {
size = other.size
Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=961116&r1=961115&r2=961116&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala Wed Jul 7 03:59:33 2010
@@ -17,10 +17,13 @@
package org.apache.activemq.apollo.broker
import _root_.org.fusesource.hawtdispatch.ScalaDispatch._
-import java.util.{LinkedList}
import org.apache.activemq.util.TreeMap
import collection.{SortedMap}
import org.fusesource.hawtdispatch.{ScalaDispatch, DispatchQueue, BaseRetained}
+import org.apache.activemq.util.TreeMap.TreeEntry
+import java.util.{Collections, ArrayList, LinkedList}
+import org.apache.activemq.util.list.LinkedNode
+import org.apache.activemq.util.list.LinkedNodeList
/**
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
@@ -45,7 +48,6 @@ trait QueueLifecyleListener {
object Queue extends Log {
- val maxOutboundSize = 1024 * 1204 * 5
}
/**
@@ -75,93 +77,154 @@ class Queue(val host: VirtualHost, val d
var message_seq_counter = 0L
var count = 0
- var maxSize: Int = 1024 * 32
- val headEntry = new PagedEntry(this)
- headEntry.seq = -1
- var tailEntry = new PagedEntry(this)
-
- object messages extends Sink[PagedEntry] {
- var counter = 0
- val entries = new TreeMap[Long, PagedEntry]()
- entries.put(headEntry.seq, headEntry)
+ val headEntry = new QueueEntry(this).tombstone
+ var tailEntry = new QueueEntry(this)
- private var size = 0
+ var counter = 0
+ val entries = new LinkedNodeList[QueueEntry]()
+ entries.addFirst(headEntry)
+
+ /**
+ * Tunning options.
+ */
+ var tune_max_size = 1024 * 32
+ var tune_subscription_prefetch = 1024*32
+ var tune_max_outbound_size = 1024 * 1204 * 5
+
+ private var size = 0
+
+ def swap() = {
+
+ class Prio(val entry:QueueEntry) extends Comparable[Prio] {
+ var value = 0
+ def compareTo(o: Prio) = o.value - value
+ }
+
+ val prios = new ArrayList[Prio](count)
+
+ var entry = entries.getHead
+ while( entry!=null ) {
+ if( entry.value.asTombstone == null ) {
+ prios.add(new Prio(entry))
+ }
+ entry = entry.getNext
+ }
+
+
+ /**
+ * adds keep priority to the range of entries starting at x
+ * and spanning the size provided.
+ */
+ def prioritize(i:Int, size:Int, p:Int):Unit = {
+ val prio = prios.get(i)
+ prio.value += p
+ val remainingSize = size - prio.entry.value.size
+ if( remainingSize > 0 ) {
+ val next = i + 1
+ if( next < prios.size ) {
+ prioritize(next, remainingSize, p-1)
+ }
+ }
+ }
+
+ // Prioritize the entries so that higher priority entries are swapped in,
+ // and lower priority entries are swapped out.
+ var i = 0
+ while( i < prios.size ) {
+ val prio = prios.get(i)
+ if( prio.entry.hasSubs ) {
+
+ var credits =0;
+ if( prio.entry.competing != Nil) {
+ credits += prio.entry.competing.size * tune_subscription_prefetch
+ } else{
+ if( prio.entry.browsing != Nil ) {
+ credits += tune_subscription_prefetch
+ }
+ }
+ prioritize(i, credits, 1000)
+
+ }
+ i += 1
+ }
+
+ Collections.sort(prios)
+
+ var remaining = tune_max_size / 2
+ i = 0
+ while( i < prios.size ) {
+ val prio = prios.get(i)
+ val entry = prio.entry
+ if( remaining > 0 ) {
+ remaining -= entry.value.size
+ }
+
+ i += 1
+ }
+
+ }
+
+ object messages extends Sink[QueueEntry] {
var refiller: Runnable = null
- def full = size >= maxSize
+ def full = size >= tune_max_size
- def offer(value: PagedEntry): Boolean = {
+ def offer(value: QueueEntry): Boolean = {
if (full) {
false
} else {
- val ref = value.delivery.ref
+ val ref = value.value.ref
if (ref != null) {
host.database.addMessageToQueue(storeId, value.seq, ref)
ref.release
}
- size += value.delivery.size
- entries.put(value.seq, value)
+ size += value.value.size
+ entries.addLast(value)
counter += 1;
- if( !value.isEmpty ) {
+// if( full ) {
+// swap
+// }
+
+ if( value.hasSubs ) {
value.dispatch
}
true
}
}
+ }
- def ack(value: PagedEntry) = {
-
- if (value.delivery.ref != null) {
- host.database.removeMessageFromQueue(storeId, value.seq, null)
- }
-
- counter -= 1
- size -= value.delivery.size
-
- value.delivery = null
-
- // acked entries turn into a tombstone entry.. adjacent tombstones
- // aggregate into a single entry.
- var current = entries.getEntry(value.seq)
- assert(current != null)
+ def ack(entry: QueueEntry) = {
- // Merge /w previous if possible
- var adj = current.previous
- if (adj.getValue.mergeTomestone(current.getValue)) {
- entries.removeEntry(current)
- current = adj
- }
+ if (entry.value.ref != null) {
+ host.database.removeMessageFromQueue(storeId, entry.seq, null)
+ }
- // Merge /w next if possible
- adj = current.next
- if (adj != null && current.getValue.mergeTomestone(adj.getValue)) {
- entries.removeEntry(adj)
- }
+ counter -= 1
+ size -= entry.value.size
+ entry.tombstone
- if (counter == 0) {
- refiller.run
- }
+ if (counter == 0) {
+ messages.refiller.run
}
+ }
- def nack(values: List[PagedEntry]) = {
- for (v <- values) {
- v.unaquire;
- // TODO: re-dispatch em.
- }
+ def nack(values: List[QueueEntry]) = {
+ // TODO:
+ for (v <- values) {
}
-
}
+
val session_manager = new SinkMux[Delivery](MapSink(messages) {x => accept(x)}, dispatchQueue, Delivery)
- val ack_source = createSource(new ListEventAggregator[(Subscription, Delivery)](), dispatchQueue)
+ val ack_source = createSource(new ListEventAggregator[(Subscription, QueueEntry)](), dispatchQueue)
ack_source.setEventHandler(^ {drain_acks});
ack_source.resume
@@ -269,13 +332,9 @@ class Queue(val host: VirtualHost, val d
* know the order. Converts the delivery to a QueueEntry
*/
private def accept(delivery: Delivery) = {
- val d = delivery.copy
- d.ack = true
val rc = tailEntry
- tailEntry = new PagedEntry(this)
- rc.seq = next_message_seq
- rc.delivery = d
- rc
+ tailEntry = new QueueEntry(this)
+ rc.loaded(next_message_seq, delivery)
}
@@ -288,58 +347,75 @@ class Queue(val host: VirtualHost, val d
}
-object PagedEntry extends Sizer[PagedEntry] {
- def size(value: PagedEntry): Int = value.delivery.size
+object QueueEntry extends Sizer[QueueEntry] {
+ def size(value: QueueEntry): Int = value.value.size
}
-class PagedEntry(val queue:Queue) extends Comparable[PagedEntry] with Runnable {
- def compareTo(o: PagedEntry) = {
- (seq - o.seq).toInt
- }
- var delivery: Delivery = null
- var seq: Long = -1
- var count: Long = 1
- var aquired = false
+class QueueEntry(val queue:Queue) extends LinkedNode[QueueEntry] with Comparable[QueueEntry] with Runnable {
+ import QueueEntry._
+
+ var seq: Long = -1L
var competing:List[Subscription] = Nil
var browsing:List[Subscription] = Nil
+ var value:EntryType = null
- def aquire() = {
- if (aquired) {
- false
- } else {
- aquired = true
- true
- }
+ def compareTo(o: QueueEntry) = {
+ (seq - o.seq).toInt
}
- def unaquire() = {
- assert(aquired)
- aquired = false
+ def loaded(seq:Long, delivery:Delivery) = {
+ this.seq = seq
+ this.value = new Loaded(delivery)
+ this
}
+ def tombstone = {
+ this.value = new Tombstone()
+ if( seq != -1L ) {
+
+ def merge(lv:QueueEntry, rv:QueueEntry):Boolean = {
+ if( lv==null || rv==null) {
+ return false
+ }
+
+ val lts = lv.value.asTombstone
+ val rts = rv.value.asTombstone
- def mergeTomestone(next: PagedEntry): Boolean = {
- if (tomestone && next.tomestone && seq + count == next.seq) {
- count += next.count
- if( next.browsing!=Nil || next.competing!=Nil ){
- addBrowsing(next.browsing)
- addCompeting(next.competing)
- next.browsing = Nil
- next.competing = Nil
+ if( lts==null || rts==null ) {
+ return false
+ }
+
+ if( lv.seq + lts.count == rv.seq ) {
+ lts.count += rts.count
+ rts.count = 0
+
+ if( rv.browsing!=Nil || rv.competing!=Nil ){
+ lv.addBrowsing(rv.browsing)
+ lv.addCompeting(rv.competing)
+ rv.browsing = Nil
+ rv.competing = Nil
+ }
+
+ return true
+ } else {
+ return false
+ }
}
- true
- } else {
- false
- }
- }
- def tomestone = {
- delivery == null
+ // Merge adjacent tombstones
+ if( merge(this, getNext) ) {
+ getNext.unlink
+ }
+ if( merge(getPrevious, this) ) {
+ this.unlink
+ }
+ }
+ this
}
- def isEmpty = competing == Nil && browsing == Nil
+ def hasSubs = !(competing == Nil && browsing == Nil)
def run() = {
var next = dispatch()
@@ -348,90 +424,12 @@ class PagedEntry(val queue:Queue) extend
}
}
- def dispatch():PagedEntry = {
-
- if( this == queue.tailEntry ) {
-
- // The tail entry does not hold data..
+ def dispatch():QueueEntry = {
+ if( value == null ) {
+ // tail entry can't be dispatched.
null
-
- } else if( this == queue.headEntry ) {
-
- // The head entry does not hold any data.. so just move
- // any assigned subs to the next entry.
-
- val p = nextEntry
- p.addBrowsing(browsing)
- p.addCompeting(competing)
- browsing = Nil
- competing = Nil
- p
-
} else {
-
- var browsingSlowSubs:List[Subscription] = Nil
- var browsingFastSubs:List[Subscription] = Nil
- var competingSlowSubs:List[Subscription] = Nil
- var competingFastSubs:List[Subscription] = Nil
-
- if( browsing!=Nil ) {
- browsing.foreach { sub =>
- if (sub.matches(this)) {
- if (sub.offer(this)) {
- browsingFastSubs ::= sub
- } else {
- browsingSlowSubs ::= sub
- }
- } else {
- browsingFastSubs ::= sub
- }
- }
- }
-
- if( competing!=Nil ) {
- if (!this.aquired) {
- this.aquire()
-
- var picked: Subscription = null
- var remaining = competing
- while( remaining!=Nil && picked == null ) {
- val sub = remaining.head
- remaining = remaining.drop(1)
-
- if (sub.matches(this)) {
- competingSlowSubs = competingSlowSubs ::: sub :: Nil
- if (sub.offer(this)) {
- picked = sub
- }
- } else {
- competingFastSubs = competingFastSubs ::: sub :: Nil
- }
- }
-
- if (picked == null) {
- this.unaquire()
- } else {
- competingFastSubs = remaining ::: competingFastSubs ::: competingSlowSubs
- competingSlowSubs = Nil
- }
- } else {
- competingFastSubs = competing
- }
- }
-
- // The slow subs stay on this entry..
- browsing = browsingSlowSubs
- competing = competingSlowSubs
-
- // the fast subs move on to the next entry...
- if ( browsingFastSubs!=null && competingFastSubs!=null) {
- val p = nextEntry
- p.addBrowsing(browsingFastSubs)
- p.addCompeting(competingFastSubs)
- p
- } else {
- null
- }
+ value.dispatch
}
}
@@ -455,25 +453,160 @@ class PagedEntry(val queue:Queue) extend
competing = competing.filterNot(_ == s)
}
- def nextEntry():PagedEntry = {
- var entry = queue.messages.entries.get(this.seq + 1)
+ def nextEntry():QueueEntry = {
+ var entry = getNext
if (entry == null) {
entry = queue.tailEntry
}
entry
}
+
+ trait EntryType {
+ def size:Int
+ def dispatch():QueueEntry
+ def ref:StoredMessageRef
+
+ def asTombstone:Tombstone = null
+ def asStored:Stored = null
+ def asLoaded:Loaded = null
+ }
+
+ class Tombstone extends EntryType {
+
+ var count = 1L
+
+ def size = 0
+ def ref = null
+
+ override def asTombstone = this
+
+ def dispatch():QueueEntry = {
+ val p = nextEntry
+ p.addBrowsing(browsing)
+ p.addCompeting(competing)
+ browsing = Nil
+ competing = Nil
+ p
+ }
+
+ }
+
+ class Stored extends EntryType {
+
+ private var loading = false
+
+ var ref:StoredMessageRef = null
+ var size = 0
+
+ override def asStored = this
+
+ // Stored entries can't be dispatched until
+ // they get loaded.
+ def dispatch():QueueEntry = {
+ null
+ }
+ }
+
+ class Loaded(val delivery: Delivery) extends EntryType {
+
+ var aquired = false
+ def ref = delivery.ref
+ def size = delivery.size
+
+ override def asLoaded = this
+
+ def dispatch():QueueEntry = {
+ if( delivery==null ) {
+ // can't dispatch untill the delivery is set.
+ null
+ } else {
+
+ var browsingSlowSubs:List[Subscription] = Nil
+ var browsingFastSubs:List[Subscription] = Nil
+ var competingSlowSubs:List[Subscription] = Nil
+ var competingFastSubs:List[Subscription] = Nil
+
+ if( browsing!=Nil ) {
+ val offering = delivery.copy
+ offering.ack = null
+
+ browsing.foreach { sub =>
+ if (sub.matches(offering)) {
+ if (sub.offer(offering)) {
+ browsingFastSubs ::= sub
+ } else {
+ browsingSlowSubs ::= sub
+ }
+ } else {
+ browsingFastSubs ::= sub
+ }
+ }
+ }
+
+ if( competing!=Nil ) {
+
+ val offering = delivery.copy
+ offering.ack = QueueEntry.this
+
+ if (!this.aquired) {
+ aquired = true
+
+ var picked: Subscription = null
+ var remaining = competing
+ while( remaining!=Nil && picked == null ) {
+ val sub = remaining.head
+ remaining = remaining.drop(1)
+
+ if (sub.matches(offering)) {
+ competingSlowSubs = competingSlowSubs ::: sub :: Nil
+ if (sub.offer(offering)) {
+ picked = sub
+ }
+ } else {
+ competingFastSubs = competingFastSubs ::: sub :: Nil
+ }
+ }
+
+ if (picked == null) {
+ aquired = false
+ } else {
+ competingFastSubs = remaining ::: competingFastSubs ::: competingSlowSubs
+ competingSlowSubs = Nil
+ }
+ } else {
+ competingFastSubs = competing
+ }
+ }
+
+ // The slow subs stay on this entry..
+ browsing = browsingSlowSubs
+ competing = competingSlowSubs
+
+ // the fast subs move on to the next entry...
+ if ( browsingFastSubs!=null && competingFastSubs!=null) {
+ val p = nextEntry
+ p.addBrowsing(browsingFastSubs)
+ p.addCompeting(competingFastSubs)
+ p
+ } else {
+ null
+ }
+ }
+ }
+ }
+
}
class Subscription(queue:Queue) extends DeliveryProducer {
def dispatchQueue = queue.dispatchQueue
- var dispatched = List[PagedEntry]()
+ var dispatched = List[QueueEntry]()
var session: Session = null
- var pos:PagedEntry = null
+ var pos:QueueEntry = null
- def position(value:PagedEntry):Unit = {
+ def position(value:QueueEntry):Unit = {
pos = value
session.refiller = pos
}
@@ -488,14 +621,17 @@ class Subscription(queue:Queue) extends
pos.removeCompeting(this)
session.close
session = null
- queue.messages.nack(dispatched)
+ queue.nack(dispatched)
}
- def matches(entry:PagedEntry) = session.consumer.matches(entry.delivery)
+ def matches(entry:Delivery) = session.consumer.matches(entry)
- def offer(entry:PagedEntry) = {
- if (session.offer(entry.delivery)) {
- dispatched = dispatched ::: entry :: Nil
+ def offer(delivery:Delivery) = {
+ if (session.offer(delivery)) {
+ if( delivery.ack!=null ) {
+ val entry = delivery.ack.asInstanceOf[QueueEntry]
+ dispatched = dispatched ::: entry :: Nil
+ }
true
} else {
false
@@ -504,25 +640,23 @@ class Subscription(queue:Queue) extends
// called from the consumer thread.. send it to the ack_source
// do that it calls _ack from the queue thread.
- override def ack(delivery: Delivery) = {
- queue.ack_source.merge((this, delivery))
+ override def ack(value: Any) = {
+ val entry = value.asInstanceOf[QueueEntry]
+ queue.ack_source.merge((this, entry))
}
- def _ack(delivery: Delivery): Unit = {
+ def _ack(entry: QueueEntry): Unit = {
assert(!dispatched.isEmpty)
- val entry = if (dispatched.head.delivery == delivery) {
+ if (dispatched.head == entry) {
// this should be the common case...
- val rc = dispatched.head
dispatched = dispatched.drop(1)
- rc
} else {
// but lets also handle the case where we get an ack out of order.
- val rc = dispatched.partition(_.delivery == delivery)
+ val rc = dispatched.partition(_ == entry)
assert(rc._1.size == 1)
dispatched = rc._2
- rc._1.head
}
- queue.messages.ack(entry)
+ queue.ack(entry)
}
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala?rev=961116&r1=961115&r2=961116&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala Wed Jul 7 03:59:33 2010
@@ -92,8 +92,8 @@ class StompProtocolHandler extends Proto
if( session.full ) {
false
} else {
- if( delivery.ack ) {
- producer.ack(delivery)
+ if( delivery.ack!=null ) {
+ producer.ack(delivery.ack)
}
val frame = delivery.message.asInstanceOf[StompFrameMessage].frame
val rc = session.offer(frame)