You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2010/07/07 05:59:45 UTC
svn commit: r961117 - in /activemq/sandbox/activemq-apollo-actor:
activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/
activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/
Author: chirino
Date: Wed Jul 7 03:59:44 2010
New Revision: 961117
URL: http://svn.apache.org/viewvc?rev=961117&view=rev
Log:
working on the store interfaces
Added:
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/TransactionManagerX.scala
- copied, changed from r961116, activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/TransactionManager.scala
Removed:
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/TransactionManager.scala
Modified:
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/BrokerDatabase.scala
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala
activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala
activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala
Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/BrokerDatabase.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/BrokerDatabase.scala?rev=961117&r1=961116&r2=961117&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/BrokerDatabase.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/BrokerDatabase.scala Wed Jul 7 03:59:44 2010
@@ -21,97 +21,147 @@ import _root_.org.fusesource.hawtdispatc
import org.fusesource.hawtbuf._
import org.apache.activemq.util.TreeMap
import java.util.concurrent.atomic.{AtomicLong}
-import java.util.{HashSet}
import collection.JavaConversions
-import org.fusesource.hawtdispatch.{BaseRetained, Retained}
+import java.util.{ArrayList, HashSet}
+import collection.mutable.HashMap
+import org.apache.activemq.Service
+import org.fusesource.hawtdispatch.{DispatchQueue, BaseRetained, Retained}
-case class StoredMessageRef(id:Long) extends BaseRetained
-
-class Record
-case class QueueRecord(val id:Long, val name:AsciiBuffer, val parent:AsciiBuffer, val config:String) extends Record
-class MessageRecord(val id:Long, val msgId:AsciiBuffer, val encoding: AsciiBuffer, val message:Buffer) extends Record
-class QueueEntryRecord(val queue:Long, val seqId:Long, val msgId:Long) extends Record
-class SubscriptionRecord(val id:Long, val pk:AsciiBuffer, val selector:AsciiBuffer, val destination:AsciiBuffer, val durable:Boolean, val tte:Long, val attachment:Buffer) extends Record
-class Action
-case class CreateRecord(record:Record) extends Action
-case class UpdateRecord(record:Record) extends Action
-case class DeleteRecord(id:Long) extends Action
+case class QueueRecord(val id:Long, val name:AsciiBuffer, val parent:AsciiBuffer, val config:String)
+case class QueueInfo(record:QueueRecord, first:Long, last:Long, count:Int)
/**
+ * A StoreTransaction is used to perform persistent
+ * operations as unit of work.
+ *
+ * The disposer assigned to the store transaction will
+ * be executed once all associated persistent operations
+ * have been persisted.
+ *
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
-class BrokerDatabase(host:VirtualHost) {
+trait StoreTransaction extends Retained {
- def start() ={
- }
+ /**
+ * Assigns the delivery a store id if it did not already
+ * have one assigned.
+ */
+ def store(delivery:Delivery)
+
+ /**
+ * Adds a delivery to a specified queue at a the specified position in the queue.
+ */
+ def enqueue(queue:Long, seq:Long, msg:Long)
+
+ /**
+ * Removes a delivery from a specified queue at a the specified position in the queue.
+ */
+ def dequeue(queue:Long, seq:Long, msg:Long)
- def stop() = {
- }
+}
- private object messages {
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+trait BrokerDatabase extends Service {
- val dispatchQueue = createQueue("MessagesTable")
- val messages = new TreeMap[Long, MessageRecord]
- val inprogress = new HashSet[Long]()
- def add(record:MessageRecord) = {
- val id= record.id
+ /**
+ * Stores a queue, calls back with a unquie id for the stored queue.
+ */
+ def addQueue(record:QueueRecord)(cb:(Option[Long])=>Unit):Unit
+
+ /**
+ * Loads the queue information for a given queue id.
+ */
+ def getQueueInfo(id:Long)(cb:(Option[QueueInfo])=>Unit )
+
+ /**
+ * gets a listing of all queues previously added.
+ */
+ def listQueues(cb: (Seq[Long])=>Unit )
+
+ /**
+ * Removes a the delivery associated with the provided from any
+ * internal buffers/caches. The callback is executed once, the message is
+ * no longer buffered.
+ */
+ def flushDelivery(id:Long)(cb: =>Unit)
+
+ /**
+ * Loads a delivery with the associated id from persistent storage.
+ */
+ def loadDelivery(id:Long)(cb:(Option[Delivery])=>Unit )
+
+ /**
+ * Creates a StoreTransaction which is used to perform persistent
+ * operations as unit of work.
+ */
+ def createStoreTransaction():StoreTransaction
- // the inprogress list protects the message from being
- // gced too soon. Protection ends once StoredMessageRef
- // is disposed..
- val ref = new StoredMessageRef(id) {
- override def dispose = ^{
- inprogress.remove(id)
- } >>: dispatchQueue
- }
+}
- using(ref) {
- inprogress.add(id)
- messages.put(record.id, record)
- } >>: dispatchQueue
+class Counter(private var value:Int = 0) {
- ref
- }
+ def get() = value
- def get(id:Long, cb:(MessageRecord)=>Unit) = reply(cb) {
- messages.get(id)
- } >>: dispatchQueue
+ def incrementAndGet() = addAndGet(1)
+ def decrementAndGet() = addAndGet(-1)
+ def addAndGet(amount:Int) = {
+ value+=amount
+ value
+ }
+ def getAndIncrement() = getAndAdd(1)
+ def getAndDecrement() = getAndAdd(-11)
+ def getAndAdd(amount:Int) = {
+ val rc = value
+ value+=amount
+ rc
}
- private val msg_id_generator = new AtomicLong
+}
- def createMessageRecord(msgId:AsciiBuffer, encoding:AsciiBuffer, message:Buffer) = {
- val record = new MessageRecord(msg_id_generator.incrementAndGet, msgId, encoding, message)
- messages.add(record)
- StoredMessageRef(record.id)
+
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+class MemoryBrokerDatabase(host:VirtualHost) extends BaseService with BrokerDatabase {
+
+ val dispatchQueue = createQueue("MessagesTable")
+
+ /////////////////////////////////////////////////////////////////////
+ //
+ // Methods related to Service interface impl
+ //
+ /////////////////////////////////////////////////////////////////////
+
+ protected def _stop(onCompleted: Runnable) = {
+ onCompleted.run
}
- case class QueueData(val record:QueueRecord) {
- var messges = new TreeMap[Long, Long]()
+ protected def _start(onCompleted: Runnable) = {
+ onCompleted.run
}
- private object queues {
- var _next_id = 0L;
- def next_id = {
- val rc = _next_id
- _next_id += 1
- rc
- }
+ /////////////////////////////////////////////////////////////////////
+ //
+ // Methods related to queue management
+ //
+ /////////////////////////////////////////////////////////////////////
+ private val queue_id_generator = new AtomicLong
+ val queues = new TreeMap[Long, QueueData]
- val dispatchQueue = createQueue("QueuesTable")
- val records = new TreeMap[Long, QueueData]
+ case class QueueData(val record:QueueRecord) {
+ var messges = new TreeMap[Long, Long]()
}
- case class QueueInfo(record:QueueRecord, first:Long, last:Long, count:Int)
-
def listQueues(cb: (Seq[Long])=>Unit ) = reply(cb) {
- JavaConversions.asSet(queues.records.keySet).toSeq
- } >>: queues.dispatchQueue
+ JavaConversions.asSet(queues.keySet).toSeq
+ } >>: dispatchQueue
def getQueueInfo(id:Long)(cb:(Option[QueueInfo])=>Unit ) = reply(cb) {
- val qd = queues.records.get(id)
+ val qd = queues.get(id)
if( qd == null ) {
None
} else {
@@ -123,27 +173,114 @@ class BrokerDatabase(host:VirtualHost) {
}
)
}
- } >>: queues.dispatchQueue
-
+ } >>: dispatchQueue
def addQueue(record:QueueRecord)(cb:(Option[Long])=>Unit):Unit = reply(cb) {
- val id = queues.next_id
- if( queues.records.containsKey(id) ) {
+ val id = queue_id_generator.incrementAndGet
+ if( queues.containsKey(id) ) {
None
} else {
- queues.records.put(id, QueueData(record))
+ queues.put(id, QueueData(record))
Some(id)
}
- } >>: queues.dispatchQueue
+ } >>: dispatchQueue
+
+ /////////////////////////////////////////////////////////////////////
+ //
+ // Methods related to message storage
+ //
+ /////////////////////////////////////////////////////////////////////
+ class MessageData(val delivery:Delivery) {
+ val queueRefs = new Counter()
+ var onFlush = List[()=>Unit]()
+ }
+
+ private val msg_id_generator = new AtomicLong
+ val messages = new TreeMap[Long, MessageData]
+
+ def flushDelivery(msg:Long)(cb: =>Unit) = ^{
+ val rc = messages.get(msg)
+ if( rc == null ) {
+ cb
+ } else {
+ rc.onFlush ::= cb _
+ }
+ } >>: dispatchQueue
+
+ def loadDelivery(ref:Long)(cb:(Option[Delivery])=>Unit ) = reply(cb) {
+ val rc = messages.get(ref)
+ if( rc == null ) {
+ None
+ } else {
+ Some(rc.delivery)
+ }
+ } >>: dispatchQueue
+
+ /////////////////////////////////////////////////////////////////////
+ //
+ // Methods related to store transactions
+ //
+ /////////////////////////////////////////////////////////////////////
+ val transactions = new HashSet[MemoryStoreTransaction]()
+
+ def createStoreTransaction() = {
+ val tx = new MemoryStoreTransaction()
+ using(tx) {
+ transactions.add(tx)
+ } >>: dispatchQueue
+ tx
+ }
- def addMessageToQueue(queue:Long, seq:Long, msg:StoredMessageRef) = using(msg) {
- val qd = queues.records.get(queue);
- qd.messges.put(seq, msg.id)
- } >>: queues.dispatchQueue
-
- def removeMessageFromQueue(queue:Long, seq:Long, retained:Retained) = using(retained) {
- val qd = queues.records.get(queue);
- qd.messges.remove(seq)
- } >>: queues.dispatchQueue
+ class MemoryStoreTransaction extends BaseRetained with StoreTransaction {
+
+ val updated = HashMap[Long, MessageData]()
+
+ def store(delivery:Delivery) = {
+ if( delivery.storeId == -1 ) {
+ delivery.storeId = msg_id_generator.incrementAndGet
+ using(this) {
+ val md = new MessageData(delivery)
+ updated.put(delivery.storeId, md)
+ messages.put(delivery.storeId, md)
+ } >>: dispatchQueue
+ }
+ }
+
+ def enqueue(queue:Long, seq:Long, msg:Long) = {
+ using(this) {
+ val qd = queues.get(queue)
+ if( qd!=null ) {
+ val md = updated.getOrElse(msg, messages.get(msg))
+ md.queueRefs.incrementAndGet
+ qd.messges.put(seq, msg)
+ }
+ } >>: dispatchQueue
+ }
+
+ def dequeue(queue:Long, seq:Long, msg:Long) = {
+ using(this) {
+ val qd = queues.get(queue)
+ if( qd!=null ) {
+ val md = updated.getOrElse(msg, messages.get(msg))
+ md.queueRefs.decrementAndGet
+ qd.messges.remove(seq)
+ }
+ } >>: dispatchQueue
+ }
+
+
+ override def dispose = {
+ dispatchQueue {
+ updated.foreach{ x=>
+ if( x._2.queueRefs.get == 0 ) {
+ messages.remove(x._1)
+ x._2.onFlush.foreach( _() )
+ }
+ }
+ transactions.remove(MemoryStoreTransaction.this)
+ super.dispose
+ }
+ }
+ }
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala?rev=961117&r1=961116&r2=961117&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala Wed Jul 7 03:59:44 2010
@@ -28,7 +28,6 @@ import org.fusesource.hawtbuf._
trait DeliveryProducer {
def dispatchQueue:DispatchQueue
- def ack(value:Any) = {}
def collocate(value:DispatchQueue):Unit = {
if( value.getTargetQueue ne dispatchQueue.getTargetQueue ) {
@@ -47,17 +46,17 @@ trait DeliveryProducer {
trait DeliveryConsumer extends Retained {
def dispatchQueue:DispatchQueue;
def matches(message:Delivery):Boolean
- def connect(producer:DeliveryProducer):Session
+ def connect(producer:DeliveryProducer):DeliverySession
}
/**
- * Before a derlivery producer can send Delivery objects to a delivery
+ * Before a delivery producer can send Delivery objects to a delivery
* consumer, it creates a Delivery session which it uses to send
* the deliveries over.
*
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
-trait Session extends Sink[Delivery] {
+trait DeliverySession extends Sink[Delivery] {
def producer:DeliveryProducer
def consumer:DeliveryConsumer
def close:Unit
@@ -108,6 +107,11 @@ trait Message {
*/
def messageEvaluationContext:MessageEvaluationContext
+ /**
+ * The protocol encoding of the message.
+ */
+ def protocol:String
+
}
/**
@@ -133,35 +137,32 @@ class Delivery extends BaseRetained {
var size:Int = 0
/**
- * the encoding format of the message
- */
- var encoding: String = null
-
- /**
* the message being delivered
*/
var message: Message = null
/**
- * the encoded form of the message being delivered.
+ * A reference to the stored version of the message.
*/
- var encoded: Buffer = null
+ var storeId:Long = -1
- var ref:StoredMessageRef = null
-
- def copy() = (new Delivery).set(this)
+ /**
+ * The transaction the delivery is participating in.
+ */
+ var storeTx:StoreTransaction = null
/**
- * Set if the producer requires an ack to be sent back
+ * Set if the producer requires an ack to be sent back. Consumer
+ * should execute once the message is processed.
*/
- var ack:Any = null
+ var ack:(StoreTransaction)=>Unit = null
+
+ def copy() = (new Delivery).set(this)
def set(other:Delivery) = {
size = other.size
- encoding = other.encoding
message = other.message
- encoded = other.encoded
- ref = other.ref
+ storeId = other.storeId
this
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=961117&r1=961116&r2=961117&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala Wed Jul 7 03:59:44 2010
@@ -22,8 +22,7 @@ import collection.{SortedMap}
import org.fusesource.hawtdispatch.{ScalaDispatch, DispatchQueue, BaseRetained}
import org.apache.activemq.util.TreeMap.TreeEntry
import java.util.{Collections, ArrayList, LinkedList}
-import org.apache.activemq.util.list.LinkedNode
-import org.apache.activemq.util.list.LinkedNodeList
+import org.apache.activemq.util.list.{LinkedNodeList, LinkedNode}
/**
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
@@ -67,10 +66,26 @@ class Queue(val host: VirtualHost, val d
debug("created queue for: " + destination)
}
setDisposer(^ {
+ ack_source.release
dispatchQueue.release
session_manager.release
})
+
+ val ack_source = createSource(new ListEventAggregator[(LinkedQueueEntry, StoreTransaction)](), dispatchQueue)
+ ack_source.setEventHandler(^ {drain_acks});
+ ack_source.resume
+
+ val store_load_source = createSource(new ListEventAggregator[(QueueEntry, Delivery)](), dispatchQueue)
+ store_load_source.setEventHandler(^ {drain_store_loads});
+ store_load_source.resume
+
+ val store_flush_source = createSource(new ListEventAggregator[QueueEntry](), dispatchQueue)
+ store_flush_source.setEventHandler(^ {drain_store_flushes});
+ store_flush_source.resume
+
+ val session_manager = new SinkMux[Delivery](messages, dispatchQueue, Delivery)
+
// sequence numbers.. used to track what's in the store.
var first_seq = -1L
var last_seq = -1L
@@ -85,6 +100,8 @@ class Queue(val host: VirtualHost, val d
val entries = new LinkedNodeList[QueueEntry]()
entries.addFirst(headEntry)
+ var flushingSize = 0
+
/**
* Tunning options.
*/
@@ -94,148 +111,89 @@ class Queue(val host: VirtualHost, val d
private var size = 0
- def swap() = {
-
- class Prio(val entry:QueueEntry) extends Comparable[Prio] {
- var value = 0
- def compareTo(o: Prio) = o.value - value
- }
-
- val prios = new ArrayList[Prio](count)
-
- var entry = entries.getHead
- while( entry!=null ) {
- if( entry.value.asTombstone == null ) {
- prios.add(new Prio(entry))
- }
- entry = entry.getNext
- }
-
-
- /**
- * adds keep priority to the range of entries starting at x
- * and spanning the size provided.
- */
- def prioritize(i:Int, size:Int, p:Int):Unit = {
- val prio = prios.get(i)
- prio.value += p
- val remainingSize = size - prio.entry.value.size
- if( remainingSize > 0 ) {
- val next = i + 1
- if( next < prios.size ) {
- prioritize(next, remainingSize, p-1)
- }
- }
- }
-
- // Prioritize the entries so that higher priority entries are swapped in,
- // and lower priority entries are swapped out.
- var i = 0
- while( i < prios.size ) {
- val prio = prios.get(i)
- if( prio.entry.hasSubs ) {
-
- var credits =0;
- if( prio.entry.competing != Nil) {
- credits += prio.entry.competing.size * tune_subscription_prefetch
- } else{
- if( prio.entry.browsing != Nil ) {
- credits += tune_subscription_prefetch
- }
- }
- prioritize(i, credits, 1000)
-
- }
- i += 1
- }
-
- Collections.sort(prios)
-
- var remaining = tune_max_size / 2
- i = 0
- while( i < prios.size ) {
- val prio = prios.get(i)
- val entry = prio.entry
- if( remaining > 0 ) {
- remaining -= entry.value.size
- }
+ object messages extends Sink[Delivery] {
- i += 1
- }
-
- }
-
- object messages extends Sink[QueueEntry] {
var refiller: Runnable = null
- def full = size >= tune_max_size
-
- def offer(value: QueueEntry): Boolean = {
+ def full = if(size >= tune_max_size)
+ true
+ else
+ false
+ def offer(delivery: Delivery): Boolean = {
if (full) {
false
} else {
- val ref = value.value.ref
- if (ref != null) {
- host.database.addMessageToQueue(storeId, value.seq, ref)
- ref.release
+ val entry = tailEntry
+ tailEntry = new QueueEntry(Queue.this)
+ entry.created(next_message_seq, delivery)
+
+ if( delivery.ack!=null ) {
+ delivery.ack(delivery.storeTx)
+ }
+ if (delivery.storeId != -1) {
+ delivery.storeTx.enqueue(storeId, entry.seq, delivery.storeId)
+ delivery.storeTx.release
}
- size += value.value.size
- entries.addLast(value)
+ size += entry.value.size
+ entries.addLast(entry)
counter += 1;
-// if( full ) {
+ if( full ) {
// swap
-// }
+ }
- if( value.hasSubs ) {
- value.dispatch
+ if( entry.hasSubs ) {
+ entry.dispatch
}
true
}
}
}
- def ack(entry: QueueEntry) = {
+ def ack(entry: QueueEntry, tx:StoreTransaction) = {
- if (entry.value.ref != null) {
- host.database.removeMessageFromQueue(storeId, entry.seq, null)
+ if (entry.value.ref != -1) {
+ val transaction = if( tx == null ) {
+ host.database.createStoreTransaction
+ } else {
+ tx
+ }
+ transaction.dequeue(storeId, entry.seq, entry.value.ref)
+ if( tx == null ) {
+ transaction.release
+ }
+ }
+ if( tx != null ) {
+ tx.release
}
counter -= 1
size -= entry.value.size
-
entry.tombstone
if (counter == 0) {
+// trace("empty.. triggering refill")
messages.refiller.run
}
}
- def nack(values: List[QueueEntry]) = {
+ def nack(values: LinkedNodeList[LinkedQueueEntry]) = {
// TODO:
- for (v <- values) {
- }
}
-
- val session_manager = new SinkMux[Delivery](MapSink(messages) {x => accept(x)}, dispatchQueue, Delivery)
-
- val ack_source = createSource(new ListEventAggregator[(Subscription, QueueEntry)](), dispatchQueue)
- ack_source.setEventHandler(^ {drain_acks});
- ack_source.resume
def drain_acks = {
ack_source.getData.foreach {
- event =>
- event._1._ack(event._2)
+ case (entry, tx) =>
+ entry.unlink
+ ack(entry.value, tx)
}
}
-
-
+
/////////////////////////////////////////////////////////////////////
//
// Implementation of the DeliveryConsumer trait. Allows this queue
@@ -245,7 +203,7 @@ class Queue(val host: VirtualHost, val d
def matches(message: Delivery) = {true}
- def connect(p: DeliveryProducer) = new Session {
+ def connect(p: DeliveryProducer) = new DeliverySession {
retain
override def consumer = Queue.this
@@ -262,11 +220,25 @@ class Queue(val host: VirtualHost, val d
// Delegate all the flow control stuff to the session
def full = session.full
- def offer(value: Delivery) = {
+ def offer(delivery: Delivery) = {
if (session.full) {
false
} else {
- val rc = session.offer(sent(value))
+
+ // Called from the producer thread before the delivery is
+ // processed by the queue's thread.. We don't
+ // yet know the order of the delivery in the queue.
+ if (delivery.storeId != -1) {
+ // If the message has a store id, then this delivery will
+ // need a tx to track the store changes.
+ if( delivery.storeTx == null ) {
+ delivery.storeTx = host.database.createStoreTransaction
+ } else {
+ delivery.storeTx.retain
+ }
+ }
+
+ val rc = session.offer(delivery)
assert(rc, "session should accept since it was not full")
true
}
@@ -313,37 +285,140 @@ class Queue(val host: VirtualHost, val d
//
/////////////////////////////////////////////////////////////////////
- /**
- * Called from the producer thread before the delivery is
- * processed by the queues' thread.. therefore we don't
- * yet know the order of the delivery in the queue.
- */
- private def sent(delivery: Delivery) = {
- if (delivery.ref != null) {
- // retain the persistent ref so that the delivery is not
- // considered completed until this queue stores it
- delivery.ref.retain
- }
- delivery
+ private def next_message_seq = {
+ val rc = message_seq_counter
+ message_seq_counter += 1
+ rc
}
- /**
- * Called from the queue thread. At this point we
- * know the order. Converts the delivery to a QueueEntry
- */
- private def accept(delivery: Delivery) = {
- val rc = tailEntry
- tailEntry = new QueueEntry(this)
- rc.loaded(next_message_seq, delivery)
+ def swap() = {
+
+ class Prio(val entry:QueueEntry) extends Comparable[Prio] {
+ var value = 0
+ def compareTo(o: Prio) = o.value - value
+ }
+
+ val prios = new ArrayList[Prio](count)
+
+ var entry = entries.getHead
+ while( entry!=null ) {
+ if( entry.value.asTombstone == null ) {
+ prios.add(new Prio(entry))
+ }
+ entry = entry.getNext
+ }
+
+
+ /**
+ * adds keep priority to the range of entries starting at x
+ * and spanning the size provided.
+ */
+ def prioritize(i:Int, size:Int, p:Int):Unit = {
+ val prio = prios.get(i)
+ prio.value += p
+ val remainingSize = size - prio.entry.value.size
+ if( remainingSize > 0 ) {
+ val next = i + 1
+ if( next < prios.size ) {
+ prioritize(next, remainingSize, p-1)
+ }
+ }
+ }
+
+ // Prioritize the entries so that higher priority entries are swapped in,
+ // and lower priority entries are swapped out.
+ var i = 0
+ while( i < prios.size ) {
+ val prio = prios.get(i)
+ if( prio.entry.hasSubs ) {
+
+ var credits =0;
+ if( prio.entry.competing != Nil) {
+ credits += prio.entry.competing.size * tune_subscription_prefetch
+ } else{
+ if( prio.entry.browsing != Nil ) {
+ credits += tune_subscription_prefetch
+ }
+ }
+ prioritize(i, credits, 1000)
+
+ }
+ i += 1
+ }
+
+ Collections.sort(prios)
+
+ var remaining = tune_max_size / 2
+ i = 0
+ while( i < prios.size ) {
+ val prio = prios.get(i)
+ val entry = prio.entry
+ if( remaining > 0 ) {
+ remaining -= entry.value.size
+ val stored = entry.value.asStored
+ if( stored!=null && !stored.loading) {
+ // start loading it back...
+ stored.loading = true
+ host.database.loadDelivery(stored.ref) { delivery =>
+ // pass off to a source so it can aggregate multiple
+ // loads to reduce cross thread synchronization
+ if( delivery.isDefined ) {
+ store_load_source.merge((entry, delivery.get))
+ }
+ }
+ }
+ } else {
+ // Chuck the reset out...
+ val loaded = entry.value.asLoaded
+ if( loaded!=null ) {
+ var ref = loaded.delivery.storeId
+ if( ref == -1 ) {
+ val tx = host.database.createStoreTransaction
+ tx.store(loaded.delivery)
+ tx.enqueue(storeId, entry.seq, loaded.delivery.storeId)
+ tx.release
+ }
+ flushingSize += entry.value.size
+ host.database.flushDelivery(ref) {
+ store_flush_source.merge(entry)
+ }
+ }
+ }
+ i += 1
+ }
}
+ def drain_store_loads() = {
+ val data = store_load_source.getData
+ var ready = List[QueueEntry]()
- private def next_message_seq = {
- val rc = message_seq_counter
- message_seq_counter += 1
- rc
+ data.foreach { event =>
+ val entry = event._1
+ entry.loaded(event._2)
+ size += entry.value.size
+
+ if( entry.hasSubs ) {
+ ready ::= entry
+ }
+ }
+
+ ready.foreach { entry =>
+ entry.dispatch
+ }
}
+ def drain_store_flushes() = {
+ store_flush_source.getData.foreach { entry =>
+ flushingSize -= entry.value.size
+
+ // by the time we get called back, subs my be interested in the entry
+ // or it may have been acked.
+ if( !entry.hasSubs && entry.value.asLoaded!=null ) {
+ size += entry.value.size
+ entry.stored
+ }
+ }
+ }
}
@@ -364,12 +439,23 @@ class QueueEntry(val queue:Queue) extend
(seq - o.seq).toInt
}
- def loaded(seq:Long, delivery:Delivery) = {
+ def created(seq:Long, delivery:Delivery) = {
this.seq = seq
this.value = new Loaded(delivery)
this
}
+ def loaded(delivery:Delivery) = {
+ this.value = new Loaded(delivery)
+ this
+ }
+
+ def stored() = {
+ val loaded = value.asLoaded
+ this.value = new Stored(loaded.delivery.storeId, loaded.size)
+ this
+ }
+
def tombstone = {
this.value = new Tombstone()
if( seq != -1L ) {
@@ -465,7 +551,7 @@ class QueueEntry(val queue:Queue) extend
trait EntryType {
def size:Int
def dispatch():QueueEntry
- def ref:StoredMessageRef
+ def ref:Long
def asTombstone:Tombstone = null
def asStored:Stored = null
@@ -477,7 +563,7 @@ class QueueEntry(val queue:Queue) extend
var count = 1L
def size = 0
- def ref = null
+ def ref = -1
override def asTombstone = this
@@ -492,12 +578,9 @@ class QueueEntry(val queue:Queue) extend
}
- class Stored extends EntryType {
-
- private var loading = false
+ class Stored(val ref:Long, val size:Int) extends EntryType {
- var ref:StoredMessageRef = null
- var size = 0
+ var loading = false
override def asStored = this
@@ -511,8 +594,9 @@ class QueueEntry(val queue:Queue) extend
class Loaded(val delivery: Delivery) extends EntryType {
var aquired = false
- def ref = delivery.ref
+ def ref = delivery.storeId
def size = delivery.size
+ def flushing = false
override def asLoaded = this
@@ -529,8 +613,6 @@ class QueueEntry(val queue:Queue) extend
if( browsing!=Nil ) {
val offering = delivery.copy
- offering.ack = null
-
browsing.foreach { sub =>
if (sub.matches(offering)) {
if (sub.offer(offering)) {
@@ -545,10 +627,6 @@ class QueueEntry(val queue:Queue) extend
}
if( competing!=Nil ) {
-
- val offering = delivery.copy
- offering.ack = QueueEntry.this
-
if (!this.aquired) {
aquired = true
@@ -557,12 +635,20 @@ class QueueEntry(val queue:Queue) extend
while( remaining!=Nil && picked == null ) {
val sub = remaining.head
remaining = remaining.drop(1)
-
- if (sub.matches(offering)) {
+ if (sub.matches(delivery)) {
competingSlowSubs = competingSlowSubs ::: sub :: Nil
- if (sub.offer(offering)) {
- picked = sub
+
+ if( !sub.full ) {
+ val node = sub.add(QueueEntry.this)
+ val offering = delivery.copy
+ offering.ack = (tx)=> {
+ queue.ack_source.merge((node, tx))
+ }
+ if (sub.offer(offering)) {
+ picked = sub
+ }
}
+
} else {
competingFastSubs = competingFastSubs ::: sub :: Nil
}
@@ -598,12 +684,15 @@ class QueueEntry(val queue:Queue) extend
}
+
+class LinkedQueueEntry(val value:QueueEntry) extends LinkedNode[LinkedQueueEntry]
+
class Subscription(queue:Queue) extends DeliveryProducer {
def dispatchQueue = queue.dispatchQueue
- var dispatched = List[QueueEntry]()
- var session: Session = null
+ var dispatched = new LinkedNodeList[LinkedQueueEntry]
+ var session: DeliverySession = null
var pos:QueueEntry = null
def position(value:QueueEntry):Unit = {
@@ -625,38 +714,12 @@ class Subscription(queue:Queue) extends
}
def matches(entry:Delivery) = session.consumer.matches(entry)
+ def full = session.full
+ def offer(delivery:Delivery) = session.offer(delivery)
- def offer(delivery:Delivery) = {
- if (session.offer(delivery)) {
- if( delivery.ack!=null ) {
- val entry = delivery.ack.asInstanceOf[QueueEntry]
- dispatched = dispatched ::: entry :: Nil
- }
- true
- } else {
- false
- }
- }
-
- // called from the consumer thread.. send it to the ack_source
- // do that it calls _ack from the queue thread.
- override def ack(value: Any) = {
- val entry = value.asInstanceOf[QueueEntry]
- queue.ack_source.merge((this, entry))
- }
-
- def _ack(entry: QueueEntry): Unit = {
- assert(!dispatched.isEmpty)
- if (dispatched.head == entry) {
- // this should be the common case...
- dispatched = dispatched.drop(1)
- } else {
- // but lets also handle the case where we get an ack out of order.
- val rc = dispatched.partition(_ == entry)
- assert(rc._1.size == 1)
- dispatched = rc._2
- }
- queue.ack(entry)
+ def add(entry:QueueEntry) = {
+ val rc = new LinkedQueueEntry(entry)
+ dispatched.addLast(rc)
+ rc
}
-
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala?rev=961117&r1=961116&r2=961117&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala Wed Jul 7 03:59:44 2010
@@ -239,7 +239,7 @@ class DeliveryProducerRoute(val destinat
dispatchQueue.release
})
- var targets = List[Session]()
+ var targets = List[DeliverySession]()
def connected(targets:List[DeliveryConsumer]) = retaining(targets) {
internal_bind(targets)
@@ -289,7 +289,7 @@ class DeliveryProducerRoute(val destinat
//
var overflow:Delivery=null
- var overflowSessions = List[Session]()
+ var overflowSessions = List[DeliverySession]()
var refiller:Runnable=null
def full = overflow!=null
Copied: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/TransactionManagerX.scala (from r961116, activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/TransactionManager.scala)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/TransactionManagerX.scala?p2=activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/TransactionManagerX.scala&p1=activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/TransactionManager.scala&r1=961116&r2=961117&rev=961117&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/TransactionManager.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/TransactionManagerX.scala Wed Jul 7 03:59:44 2010
@@ -18,7 +18,10 @@ package org.apache.activemq.apollo.broke
import _root_.java.util.{LinkedHashMap, HashMap}
-class TransactionManager() {
+
+
+
+class TransactionManagerX() {
var virtualHost:VirtualHost = null
@@ -390,7 +393,7 @@ class TransactionManager() {
* Keeps track of all the actions the need to be done when a transaction does a
* commit or rollback.
*/
-abstract class Transaction {
+abstract class TransactionX {
// TODO:
// private static final Log LOG = LogFactory.getLog(Transaction.class);
@@ -927,7 +930,7 @@ abstract class Transaction {
* @author cmacnaug
* @version 1.0
*/
-class LocalTransaction extends Transaction {
+class LocalTransactionX extends TransactionX {
// TODO:
// LocalTransaction(TransactionManager manager, long tid, IQueue<Long, TxOp> opQueue) {
@@ -1028,7 +1031,7 @@ class LocalTransaction extends Transacti
* @author cmacnaug
* @version 1.0
*/
-class XATransaction extends Transaction {
+class XATransactionX extends TransactionX {
// TODO:
// private final Buffer xid;
//
Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala?rev=961117&r1=961116&r2=961117&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala Wed Jul 7 03:59:44 2010
@@ -86,8 +86,8 @@ class VirtualHost(val broker: Broker) ex
this.names = names.toList
}
- var database:BrokerDatabase = new BrokerDatabase(this)
- var transactionManager:TransactionManager = new TransactionManager
+ var database:BrokerDatabase = new MemoryBrokerDatabase(this)
+ var transactionManager:TransactionManagerX = new TransactionManagerX
override def toString = if (config==null) "virtual-host" else "virtual-host: "+config.id
Modified: activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala?rev=961117&r1=961116&r2=961117&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala Wed Jul 7 03:59:44 2010
@@ -37,6 +37,8 @@ import StompConstants._;
import BufferConversions._
case class StompFrameMessage(frame:StompFrame) extends Message {
+
+ def protocol = "stomp"
/**
* the globally unique id of the message
Modified: activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala?rev=961117&r1=961116&r2=961117&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala Wed Jul 7 03:59:44 2010
@@ -73,7 +73,7 @@ class StompProtocolHandler extends Proto
delivery.message.isInstanceOf[StompFrameMessage]
}
- def connect(p:DeliveryProducer) = new Session {
+ def connect(p:DeliveryProducer) = new DeliverySession {
retain
def producer = p
@@ -93,7 +93,7 @@ class StompProtocolHandler extends Proto
false
} else {
if( delivery.ack!=null ) {
- producer.ack(delivery.ack)
+ delivery.ack(null)
}
val frame = delivery.message.asInstanceOf[StompFrameMessage].frame
val rc = session.offer(frame)