You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2010/07/07 05:59:25 UTC
svn commit: r961115 - in
/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker:
BrokerDatabase.scala Delivery.scala Queue.scala Sink.scala VirtualHost.scala
Author: chirino
Date: Wed Jul 7 03:59:25 2010
New Revision: 961115
URL: http://svn.apache.org/viewvc?rev=961115&view=rev
Log:
Beefed up queue implementation to support browsers and consumers consuming at different positions in the queue.
Modified:
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/BrokerDatabase.scala
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala
Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/BrokerDatabase.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/BrokerDatabase.scala?rev=961115&r1=961114&r2=961115&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/BrokerDatabase.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/BrokerDatabase.scala Wed Jul 7 03:59:25 2010
@@ -23,6 +23,9 @@ import org.apache.activemq.util.TreeMap
import java.util.concurrent.atomic.{AtomicLong}
import java.util.{HashSet}
import collection.JavaConversions
+import org.fusesource.hawtdispatch.{BaseRetained, Retained}
+
+case class StoredMessageRef(id:Long) extends BaseRetained
class Record
case class QueueRecord(val id:Long, val name:AsciiBuffer, val parent:AsciiBuffer, val config:String) extends Record
@@ -78,15 +81,15 @@ class BrokerDatabase(host:VirtualHost) {
}
private val msg_id_generator = new AtomicLong
+
def createMessageRecord(msgId:AsciiBuffer, encoding:AsciiBuffer, message:Buffer) = {
val record = new MessageRecord(msg_id_generator.incrementAndGet, msgId, encoding, message)
messages.add(record)
+ StoredMessageRef(record.id)
}
-
-
case class QueueData(val record:QueueRecord) {
- var messges:List[Long] = Nil
+ var messges = new TreeMap[Long, Long]()
}
private object queues {
@@ -116,7 +119,7 @@ class BrokerDatabase(host:VirtualHost) {
if( qd.messges.isEmpty ) {
QueueInfo(qd.record, -1, -1, 0)
} else {
- QueueInfo(qd.record, qd.messges.head, qd.messges.last, qd.messges.size)
+ QueueInfo(qd.record, qd.messges.firstKey, qd.messges.lastKey, qd.messges.size)
}
)
}
@@ -133,4 +136,14 @@ class BrokerDatabase(host:VirtualHost) {
}
} >>: queues.dispatchQueue
+ def addMessageToQueue(queue:Long, seq:Long, msg:StoredMessageRef) = using(msg) {
+ val qd = queues.records.get(queue);
+ qd.messges.put(seq, msg.id)
+ } >>: queues.dispatchQueue
+
+ def removeMessageFromQueue(queue:Long, seq:Long, retained:Retained) = using(retained) {
+ val qd = queues.records.get(queue);
+ qd.messges.remove(seq)
+ } >>: queues.dispatchQueue
+
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala?rev=961115&r1=961114&r2=961115&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala Wed Jul 7 03:59:25 2010
@@ -46,7 +46,7 @@ trait DeliveryProducer {
*/
trait DeliveryConsumer extends Retained {
def dispatchQueue:DispatchQueue;
- def matches(message:Delivery)
+ def matches(message:Delivery):Boolean
def connect(producer:DeliveryProducer):Session
}
@@ -110,8 +110,6 @@ trait Message {
}
-case class StoredMessageRef(id:Long) extends BaseRetained
-
/**
* <p>
* A new Delivery object is created every time a message is transfered between a producer and
Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=961115&r1=961114&r2=961115&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala Wed Jul 7 03:59:25 2010
@@ -17,53 +17,54 @@
package org.apache.activemq.apollo.broker
import _root_.org.fusesource.hawtdispatch.ScalaDispatch._
+import java.util.{LinkedList}
+import org.apache.activemq.util.TreeMap
import collection.{SortedMap}
-import java.util.LinkedList
-import org.fusesource.hawtdispatch.{EventAggregators, DispatchQueue, BaseRetained}
+import org.fusesource.hawtdispatch.{ScalaDispatch, DispatchQueue, BaseRetained}
/**
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
trait QueueLifecyleListener {
- /**
- * A destination has bean created
- *
- * @param queue
- */
- def onCreate(queue:Queue);
-
- /**
- * A destination has bean destroyed
- *
- * @param queue
- */
- def onDestroy(queue:Queue);
+ /**
+ * A destination has bean created
+ *
+ * @param queue
+ */
+ def onCreate(queue: Queue);
-}
+ /**
+ * A destination has bean destroyed
+ *
+ * @param queue
+ */
+ def onDestroy(queue: Queue);
-object QueueEntry extends Sizer[QueueEntry] {
- def size(value:QueueEntry):Int = value.delivery.size
}
-class QueueEntry(val seq:Long, val delivery:Delivery)
+
object Queue extends Log {
- val maxOutboundSize = 1024*1204*5
+ val maxOutboundSize = 1024 * 1204 * 5
}
/**
*
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
-class Queue(val destination:Destination, val storeId:Long) extends BaseRetained with Route with DeliveryConsumer with DeliveryProducer with DispatchLogging {
+class Queue(val host: VirtualHost, val destination: Destination, val storeId: Long) extends BaseRetained with Route with DeliveryConsumer with DispatchLogging {
override protected def log = Queue
- override val dispatchQueue:DispatchQueue = createQueue("queue:"+destination);
+ import Queue._
+
+ var consumerSubs = Map[DeliveryConsumer, Subscription]()
+
+ override val dispatchQueue: DispatchQueue = createQueue("queue:" + destination);
dispatchQueue.setTargetQueue(getRandomThreadQueue)
dispatchQueue {
- debug("created queue for: "+destination)
+ debug("created queue for: " + destination)
}
- setDisposer(^{
+ setDisposer(^ {
dispatchQueue.release
session_manager.release
})
@@ -71,62 +72,106 @@ class Queue(val destination:Destination,
// sequence numbers.. used to track what's in the store.
var first_seq = -1L
var last_seq = -1L
- var message_seq_counter=0L
+ var message_seq_counter = 0L
var count = 0
- val pending = new QueueSink[QueueEntry](QueueEntry)
- val session_manager = new SinkMux[Delivery](MapSink(pending){ x=>accept(x) }, dispatchQueue, Delivery)
- pending.drainer = ^{ drain }
+ var maxSize: Int = 1024 * 32
+ val headEntry = new PagedEntry(this)
+ headEntry.seq = -1
+ var tailEntry = new PagedEntry(this)
- /////////////////////////////////////////////////////////////////////
- //
- // Implementation of the Route trait. Allows consumers to bind/unbind
- // from this queue so that it can send messages to them.
- //
- /////////////////////////////////////////////////////////////////////
+ object messages extends Sink[PagedEntry] {
+ var counter = 0
+ val entries = new TreeMap[Long, PagedEntry]()
+ entries.put(headEntry.seq, headEntry)
+
+ private var size = 0
+ var refiller: Runnable = null
- class ConsumerState(val session:Session) extends Runnable {
- session.refiller = this
- var bound=true
- var ready=true
-
- def run() = {
- if( bound && !ready ) {
- ready = true
- readyConsumers.addLast(this)
- drain
+ def full = size >= maxSize
+
+ def offer(value: PagedEntry): Boolean = {
+
+ if (full) {
+ false
+ } else {
+
+ val ref = value.delivery.ref
+ if (ref != null) {
+ host.database.addMessageToQueue(storeId, value.seq, ref)
+ ref.release
+ }
+
+ size += value.delivery.size
+ entries.put(value.seq, value)
+ counter += 1;
+
+ if( !value.isEmpty ) {
+ value.dispatch
+ }
+ true
}
}
- }
- var allConsumers = Map[DeliveryConsumer,ConsumerState]()
- val readyConsumers = new LinkedList[ConsumerState]()
-
- def connected(consumers:List[DeliveryConsumer]) = bind(consumers)
- def bind(consumers:List[DeliveryConsumer]) = retaining(consumers) {
- for ( consumer <- consumers ) {
- val cs = new ConsumerState(consumer.connect(Queue.this))
- allConsumers += consumer->cs
- readyConsumers.addLast(cs)
- }
- drain
- } >>: dispatchQueue
-
- def unbind(consumers:List[DeliveryConsumer]) = releasing(consumers) {
- for ( consumer <- consumers ) {
- allConsumers.get(consumer) match {
- case Some(cs)=>
- cs.bound = false
- cs.session.close
- allConsumers -= consumer
- readyConsumers.remove(cs)
- case None=>
- }
+ def ack(value: PagedEntry) = {
+
+ if (value.delivery.ref != null) {
+ host.database.removeMessageFromQueue(storeId, value.seq, null)
}
- } >>: dispatchQueue
- def disconnected() = throw new RuntimeException("unsupported")
+ counter -= 1
+ size -= value.delivery.size
+
+ value.delivery = null
+
+ // acked entries turn into a tombstone entry.. adjacent tombstones
+ // aggregate into a single entry.
+ var current = entries.getEntry(value.seq)
+ assert(current != null)
+
+ // Merge /w previous if possible
+ var adj = current.previous
+ if (adj.getValue.mergeTomestone(current.getValue)) {
+ entries.removeEntry(current)
+ current = adj
+ }
+
+ // Merge /w next if possible
+ adj = current.next
+ if (adj != null && current.getValue.mergeTomestone(adj.getValue)) {
+ entries.removeEntry(adj)
+ }
+
+
+ if (counter == 0) {
+ refiller.run
+ }
+ }
+
+
+ def nack(values: List[PagedEntry]) = {
+ for (v <- values) {
+ v.unaquire;
+ // TODO: re-dispatch em.
+ }
+ }
+
+ }
+
+ val session_manager = new SinkMux[Delivery](MapSink(messages) {x => accept(x)}, dispatchQueue, Delivery)
+
+ val ack_source = createSource(new ListEventAggregator[(Subscription, Delivery)](), dispatchQueue)
+ ack_source.setEventHandler(^ {drain_acks});
+ ack_source.resume
+
+ def drain_acks = {
+ ack_source.getData.foreach {
+ event =>
+ event._1._ack(event._2)
+ }
+ }
+
/////////////////////////////////////////////////////////////////////
//
@@ -135,12 +180,13 @@ class Queue(val destination:Destination,
//
/////////////////////////////////////////////////////////////////////
- def matches(message:Delivery) = { true }
+ def matches(message: Delivery) = {true}
- def connect(p:DeliveryProducer) = new Session {
+ def connect(p: DeliveryProducer) = new Session {
retain
override def consumer = Queue.this
+
override def producer = p
val session = session_manager.open(producer.dispatchQueue)
@@ -152,8 +198,9 @@ class Queue(val destination:Destination,
// Delegate all the flow control stuff to the session
def full = session.full
- def offer(value:Delivery) = {
- if( session.full ) {
+
+ def offer(value: Delivery) = {
+ if (session.full) {
false
} else {
val rc = session.offer(sent(value))
@@ -161,28 +208,41 @@ class Queue(val destination:Destination,
true
}
}
-
+
def refiller = session.refiller
- def refiller_=(value:Runnable) = { session.refiller=value }
- }
+ def refiller_=(value: Runnable) = {session.refiller = value}
+ }
/////////////////////////////////////////////////////////////////////
//
- // Implementation of the DeliveryProducer trait.
- // It mainly deals with handling message acks from bound consumers.
+ // Implementation of the Route trait. Allows consumers to bind/unbind
+ // from this queue so that it can send messages to them.
//
/////////////////////////////////////////////////////////////////////
- val ack_source = createSource(EventAggregators.INTEGER_ADD, dispatchQueue)
- ack_source.setEventHandler(^{drain_acks});
- ack_source.resume
- def drain_acks = {
- pending.ack(ack_source.getData.intValue)
- }
- override def ack(ack:Delivery) = {
- ack_source.merge(ack.size)
- }
+ def connected(consumers: List[DeliveryConsumer]) = bind(consumers)
+
+ def bind(consumers: List[DeliveryConsumer]) = retaining(consumers) {
+ for (consumer <- consumers) {
+ val subscription = new Subscription(this)
+ subscription.connect(consumer)
+ consumerSubs += consumer -> subscription
+ }
+ } >>: dispatchQueue
+
+ def unbind(consumers: List[DeliveryConsumer]) = releasing(consumers) {
+ for (consumer <- consumers) {
+ consumerSubs.get(consumer) match {
+ case Some(cs) =>
+ cs.close
+ consumerSubs -= consumer
+ case None =>
+ }
+ }
+ } >>: dispatchQueue
+
+ def disconnected() = throw new RuntimeException("unsupported")
/////////////////////////////////////////////////////////////////////
//
@@ -195,8 +255,10 @@ class Queue(val destination:Destination,
* processed by the queues' thread.. therefore we don't
* yet know the order of the delivery in the queue.
*/
- private def sent(delivery:Delivery) = {
- if( delivery.ref !=null ) {
+ private def sent(delivery: Delivery) = {
+ if (delivery.ref != null) {
+ // retain the persistent ref so that the delivery is not
+ // considered completed until this queue stores it
delivery.ref.retain
}
delivery
@@ -206,30 +268,16 @@ class Queue(val destination:Destination,
* Called from the queue thread. At this point we
* know the order. Converts the delivery to a QueueEntry
*/
- private def accept(delivery:Delivery) = {
+ private def accept(delivery: Delivery) = {
val d = delivery.copy
d.ack = true
- new QueueEntry(next_message_seq, d)
+ val rc = tailEntry
+ tailEntry = new PagedEntry(this)
+ rc.seq = next_message_seq
+ rc.delivery = d
+ rc
}
- /**
- * Dispatches as many messages to ready consumers
- * as possible.
- */
- private def drain: Unit = {
- while (!readyConsumers.isEmpty && !pending.isEmpty) {
- val cs = readyConsumers.removeFirst
- val queueEntry = pending.poll
- if( cs.session.offer(queueEntry.delivery) ) {
- // consumer was not full.. keep him in the ready list
- readyConsumers.addLast(cs)
- } else {
- // consumer full.
- cs.ready = false
- pending.unpoll(queueEntry)
- }
- }
- }
private def next_message_seq = {
val rc = message_seq_counter
@@ -239,3 +287,242 @@ class Queue(val destination:Destination,
}
+
+object PagedEntry extends Sizer[PagedEntry] {
+ def size(value: PagedEntry): Int = value.delivery.size
+
+}
+class PagedEntry(val queue:Queue) extends Comparable[PagedEntry] with Runnable {
+ def compareTo(o: PagedEntry) = {
+ (seq - o.seq).toInt
+ }
+
+ var delivery: Delivery = null
+ var seq: Long = -1
+ var count: Long = 1
+ var aquired = false
+ var competing:List[Subscription] = Nil
+ var browsing:List[Subscription] = Nil
+
+ def aquire() = {
+ if (aquired) {
+ false
+ } else {
+ aquired = true
+ true
+ }
+ }
+
+ def unaquire() = {
+ assert(aquired)
+ aquired = false
+ }
+
+
+ def mergeTomestone(next: PagedEntry): Boolean = {
+ if (tomestone && next.tomestone && seq + count == next.seq) {
+ count += next.count
+ if( next.browsing!=Nil || next.competing!=Nil ){
+ addBrowsing(next.browsing)
+ addCompeting(next.competing)
+ next.browsing = Nil
+ next.competing = Nil
+ }
+ true
+ } else {
+ false
+ }
+ }
+
+ def tomestone = {
+ delivery == null
+ }
+
+ def isEmpty = competing == Nil && browsing == Nil
+
+
+ def run() = {
+ var next = dispatch()
+ while( next!=null ) {
+ next = next.dispatch
+ }
+ }
+
+ def dispatch():PagedEntry = {
+
+ if( this == queue.tailEntry ) {
+
+ // The tail entry does not hold data..
+ null
+
+ } else if( this == queue.headEntry ) {
+
+ // The head entry does not hold any data.. so just move
+ // any assigned subs to the next entry.
+
+ val p = nextEntry
+ p.addBrowsing(browsing)
+ p.addCompeting(competing)
+ browsing = Nil
+ competing = Nil
+ p
+
+ } else {
+
+ var browsingSlowSubs:List[Subscription] = Nil
+ var browsingFastSubs:List[Subscription] = Nil
+ var competingSlowSubs:List[Subscription] = Nil
+ var competingFastSubs:List[Subscription] = Nil
+
+ if( browsing!=Nil ) {
+ browsing.foreach { sub =>
+ if (sub.matches(this)) {
+ if (sub.offer(this)) {
+ browsingFastSubs ::= sub
+ } else {
+ browsingSlowSubs ::= sub
+ }
+ } else {
+ browsingFastSubs ::= sub
+ }
+ }
+ }
+
+ if( competing!=Nil ) {
+ if (!this.aquired) {
+ this.aquire()
+
+ var picked: Subscription = null
+ var remaining = competing
+ while( remaining!=Nil && picked == null ) {
+ val sub = remaining.head
+ remaining = remaining.drop(1)
+
+ if (sub.matches(this)) {
+ competingSlowSubs = competingSlowSubs ::: sub :: Nil
+ if (sub.offer(this)) {
+ picked = sub
+ }
+ } else {
+ competingFastSubs = competingFastSubs ::: sub :: Nil
+ }
+ }
+
+ if (picked == null) {
+ this.unaquire()
+ } else {
+ competingFastSubs = remaining ::: competingFastSubs ::: competingSlowSubs
+ competingSlowSubs = Nil
+ }
+ } else {
+ competingFastSubs = competing
+ }
+ }
+
+ // The slow subs stay on this entry..
+ browsing = browsingSlowSubs
+ competing = competingSlowSubs
+
+ // the fast subs move on to the next entry...
+ if ( browsingFastSubs!=null && competingFastSubs!=null) {
+ val p = nextEntry
+ p.addBrowsing(browsingFastSubs)
+ p.addCompeting(competingFastSubs)
+ p
+ } else {
+ null
+ }
+ }
+ }
+
+ def addBrowsing(l:List[Subscription]) = {
+ l.foreach(x=>x.position(this))
+ browsing :::= l
+ }
+
+ def addCompeting(l:List[Subscription]) = {
+ l.foreach(x=>x.position(this))
+ competing :::= l
+ }
+
+ def removeBrowsing(s:Subscription) = {
+ s.position(null)
+ browsing = browsing.filterNot(_ == s)
+ }
+
+ def removeCompeting(s:Subscription) = {
+ s.position(null)
+ competing = competing.filterNot(_ == s)
+ }
+
+ def nextEntry():PagedEntry = {
+ var entry = queue.messages.entries.get(this.seq + 1)
+ if (entry == null) {
+ entry = queue.tailEntry
+ }
+ entry
+ }
+
+}
+
+class Subscription(queue:Queue) extends DeliveryProducer {
+
+ def dispatchQueue = queue.dispatchQueue
+
+ var dispatched = List[PagedEntry]()
+ var session: Session = null
+ var pos:PagedEntry = null
+
+ def position(value:PagedEntry):Unit = {
+ pos = value
+ session.refiller = pos
+ }
+
+ def connect(consumer: DeliveryConsumer) = {
+ session = consumer.connect(this)
+ queue.headEntry.addCompeting(this :: Nil)
+ queue.dispatchQueue << queue.headEntry
+ }
+
+ def close() = {
+ pos.removeCompeting(this)
+ session.close
+ session = null
+ queue.messages.nack(dispatched)
+ }
+
+ def matches(entry:PagedEntry) = session.consumer.matches(entry.delivery)
+
+ def offer(entry:PagedEntry) = {
+ if (session.offer(entry.delivery)) {
+ dispatched = dispatched ::: entry :: Nil
+ true
+ } else {
+ false
+ }
+ }
+
+ // called from the consumer thread.. send it to the ack_source
+ // do that it calls _ack from the queue thread.
+ override def ack(delivery: Delivery) = {
+ queue.ack_source.merge((this, delivery))
+ }
+
+ def _ack(delivery: Delivery): Unit = {
+ assert(!dispatched.isEmpty)
+ val entry = if (dispatched.head.delivery == delivery) {
+ // this should be the common case...
+ val rc = dispatched.head
+ dispatched = dispatched.drop(1)
+ rc
+ } else {
+ // but lets also handle the case where we get an ack out of order.
+ val rc = dispatched.partition(_.delivery == delivery)
+ assert(rc._1.size == 1)
+ dispatched = rc._2
+ rc._1.head
+ }
+ queue.messages.ack(entry)
+ }
+
+}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala?rev=961115&r1=961114&r2=961115&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala Wed Jul 7 03:59:25 2010
@@ -124,7 +124,11 @@ object MapSink {
def full = downstream.full
def offer(value:Y) = {
- downstream.offer(func(value))
+ if( full ) {
+ false
+ } else {
+ downstream.offer(func(value))
+ }
}
}
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala?rev=961115&r1=961114&r2=961115&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala Wed Jul 7 03:59:25 2010
@@ -120,7 +120,7 @@ class VirtualHost(val broker: Broker) ex
val dest = DestinationParser.parse(info.record.name, destination_parser_options)
if( dest.getDomain == Domain.QUEUE_DOMAIN ) {
- val queue = new Queue(dest, id)
+ val queue = new Queue(this, dest, id)
queue.first_seq = info.first
queue.last_seq = info.last
queue.message_seq_counter = info.last+1
@@ -185,7 +185,7 @@ class VirtualHost(val broker: Broker) ex
rc match {
case Some(id) =>
dispatchQueue ^ {
- val queue = new Queue(dest, id)
+ val queue = new Queue(this, dest, id)
queues.put(name, queue)
cb(queue)
}