You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2010/07/07 05:59:45 UTC

svn commit: r961117 - in /activemq/sandbox/activemq-apollo-actor: activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/ activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/

Author: chirino
Date: Wed Jul  7 03:59:44 2010
New Revision: 961117

URL: http://svn.apache.org/viewvc?rev=961117&view=rev
Log:
working on the store interfaces

Added:
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/TransactionManagerX.scala
      - copied, changed from r961116, activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/TransactionManager.scala
Removed:
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/TransactionManager.scala
Modified:
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/BrokerDatabase.scala
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala
    activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala
    activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/BrokerDatabase.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/BrokerDatabase.scala?rev=961117&r1=961116&r2=961117&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/BrokerDatabase.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/BrokerDatabase.scala Wed Jul  7 03:59:44 2010
@@ -21,97 +21,147 @@ import _root_.org.fusesource.hawtdispatc
 import org.fusesource.hawtbuf._
 import org.apache.activemq.util.TreeMap
 import java.util.concurrent.atomic.{AtomicLong}
-import java.util.{HashSet}
 import collection.JavaConversions
-import org.fusesource.hawtdispatch.{BaseRetained, Retained}
+import java.util.{ArrayList, HashSet}
+import collection.mutable.HashMap
+import org.apache.activemq.Service
+import org.fusesource.hawtdispatch.{DispatchQueue, BaseRetained, Retained}
 
-case class StoredMessageRef(id:Long) extends BaseRetained
-
-class Record
-case class QueueRecord(val id:Long, val name:AsciiBuffer, val parent:AsciiBuffer, val config:String) extends Record
-class MessageRecord(val id:Long, val msgId:AsciiBuffer, val encoding: AsciiBuffer, val message:Buffer)  extends Record
-class QueueEntryRecord(val queue:Long, val seqId:Long, val msgId:Long) extends Record
-class SubscriptionRecord(val id:Long, val pk:AsciiBuffer, val selector:AsciiBuffer, val destination:AsciiBuffer, val durable:Boolean, val tte:Long, val attachment:Buffer) extends Record
-class Action
-case class CreateRecord(record:Record) extends Action
-case class UpdateRecord(record:Record) extends Action
-case class DeleteRecord(id:Long) extends Action
+case class QueueRecord(val id:Long, val name:AsciiBuffer, val parent:AsciiBuffer, val config:String)
+case class QueueInfo(record:QueueRecord, first:Long, last:Long, count:Int)
 
 /**
+ * A StoreTransaction is used to perform persistent
+ * operations as unit of work.
+ *
+ * The disposer assigned to the store transaction will
+ * be executed once all associated persistent operations
+ * have been persisted.
+ *
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-class BrokerDatabase(host:VirtualHost) {
+trait StoreTransaction extends Retained {
 
-  def start() ={
-  }
+  /**
+   * Assigns the delivery a store id if it did not already
+   * have one assigned.
+   */
+  def store(delivery:Delivery)
+
+  /**
+   * Adds a delivery to a specified queue at a the specified position in the queue.
+   */
+  def enqueue(queue:Long, seq:Long, msg:Long)
+
+  /**
+   * Removes a delivery from a specified queue at a the specified position in the queue.
+   */
+  def dequeue(queue:Long, seq:Long, msg:Long)
 
-  def stop() = {
-  }
+}
 
-  private object messages {
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+trait BrokerDatabase extends Service {
 
-    val dispatchQueue = createQueue("MessagesTable")
-    val messages = new TreeMap[Long, MessageRecord]
-    val inprogress = new HashSet[Long]()
 
-    def add(record:MessageRecord) = {
-      val id= record.id
+  /**
+   * Stores a queue, calls back with a unquie id for the stored queue.
+   */
+  def addQueue(record:QueueRecord)(cb:(Option[Long])=>Unit):Unit
+
+  /**
+   * Loads the queue information for a given queue id.
+   */
+  def getQueueInfo(id:Long)(cb:(Option[QueueInfo])=>Unit )
+
+  /**
+   * gets a listing of all queues previously added.
+   */
+  def listQueues(cb: (Seq[Long])=>Unit )
+
+  /**
+   * Removes a the delivery associated with the provided from any
+   * internal buffers/caches.  The callback is executed once, the message is
+   * no longer buffered.
+   */
+  def flushDelivery(id:Long)(cb: =>Unit)
+
+  /**
+   * Loads a delivery with the associated id from persistent storage.
+   */
+  def loadDelivery(id:Long)(cb:(Option[Delivery])=>Unit )
+
+  /**
+   * Creates a StoreTransaction which is used to perform persistent
+   * operations as unit of work.
+   */
+  def createStoreTransaction():StoreTransaction
 
-      // the inprogress list protects the message from being
-      // gced too soon.  Protection ends once StoredMessageRef
-      // is disposed..
-      val ref = new StoredMessageRef(id) {
-        override def dispose = ^{
-          inprogress.remove(id)
-        } >>: dispatchQueue
-      }
+}
 
-      using(ref) {
-        inprogress.add(id)
-        messages.put(record.id, record)
-      } >>: dispatchQueue
+class Counter(private var value:Int = 0) {
 
-      ref
-    }
+  def get() = value
 
-    def get(id:Long, cb:(MessageRecord)=>Unit) = reply(cb) {
-      messages.get(id)
-    } >>: dispatchQueue
+  def incrementAndGet() = addAndGet(1)
+  def decrementAndGet() = addAndGet(-1)
+  def addAndGet(amount:Int) = {
+    value+=amount
+    value
+  }
 
+  def getAndIncrement() = getAndAdd(1)
+  def getAndDecrement() = getAndAdd(-11)
+  def getAndAdd(amount:Int) = {
+    val rc = value
+    value+=amount
+    rc
   }
 
-  private val msg_id_generator = new AtomicLong
+}
 
-  def createMessageRecord(msgId:AsciiBuffer, encoding:AsciiBuffer, message:Buffer) = {
-    val record = new MessageRecord(msg_id_generator.incrementAndGet, msgId, encoding, message)
-    messages.add(record)
-    StoredMessageRef(record.id)
+
+/**
+ *  @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+class MemoryBrokerDatabase(host:VirtualHost) extends BaseService with BrokerDatabase {
+
+  val dispatchQueue = createQueue("MessagesTable")
+
+  /////////////////////////////////////////////////////////////////////
+  //
+  // Methods related to Service interface impl
+  //
+  /////////////////////////////////////////////////////////////////////
+
+  protected def _stop(onCompleted: Runnable) = {
+    onCompleted.run
   }
 
-  case class QueueData(val record:QueueRecord) {
-    var messges = new TreeMap[Long, Long]()
+  protected def _start(onCompleted: Runnable) = {
+    onCompleted.run
   }
 
-  private object queues {
-    var _next_id = 0L;
-    def next_id = {
-      val rc = _next_id
-      _next_id += 1
-      rc
-    }
+  /////////////////////////////////////////////////////////////////////
+  //
+  // Methods related to queue management
+  //
+  /////////////////////////////////////////////////////////////////////
+  private val queue_id_generator = new AtomicLong
+  val queues = new TreeMap[Long, QueueData]
 
-    val dispatchQueue = createQueue("QueuesTable")
-    val records = new TreeMap[Long, QueueData]
+  case class QueueData(val record:QueueRecord) {
+    var messges = new TreeMap[Long, Long]()
   }
 
-  case class QueueInfo(record:QueueRecord, first:Long, last:Long, count:Int)
-
   def listQueues(cb: (Seq[Long])=>Unit ) = reply(cb) {
-    JavaConversions.asSet(queues.records.keySet).toSeq
-  } >>: queues.dispatchQueue
+    JavaConversions.asSet(queues.keySet).toSeq
+  } >>: dispatchQueue
 
   def getQueueInfo(id:Long)(cb:(Option[QueueInfo])=>Unit ) = reply(cb) {
-    val qd = queues.records.get(id)
+    val qd = queues.get(id)
     if( qd == null ) {
       None
     } else {
@@ -123,27 +173,114 @@ class BrokerDatabase(host:VirtualHost) {
         }
       )
     }
-  } >>: queues.dispatchQueue
-
+  } >>: dispatchQueue
 
   def addQueue(record:QueueRecord)(cb:(Option[Long])=>Unit):Unit = reply(cb) {
-    val id = queues.next_id
-    if( queues.records.containsKey(id) ) {
+    val id = queue_id_generator.incrementAndGet
+    if( queues.containsKey(id) ) {
       None
     } else {
-      queues.records.put(id, QueueData(record))
+      queues.put(id, QueueData(record))
       Some(id)
     }
-  } >>: queues.dispatchQueue
+  } >>: dispatchQueue
+
+  /////////////////////////////////////////////////////////////////////
+  //
+  // Methods related to message storage
+  //
+  /////////////////////////////////////////////////////////////////////
+  class MessageData(val delivery:Delivery) {
+    val queueRefs = new Counter()
+    var onFlush = List[()=>Unit]()
+  }
+
+  private val msg_id_generator = new AtomicLong
+  val messages = new TreeMap[Long, MessageData]
+
+  def flushDelivery(msg:Long)(cb: =>Unit) = ^{
+    val rc = messages.get(msg)
+    if( rc == null ) {
+      cb
+    } else {
+      rc.onFlush ::= cb _
+    }
+  } >>: dispatchQueue
+
+  def loadDelivery(ref:Long)(cb:(Option[Delivery])=>Unit ) = reply(cb) {
+    val rc = messages.get(ref)
+    if( rc == null ) {
+      None
+    } else {
+      Some(rc.delivery)
+    }
+  } >>: dispatchQueue
+
+  /////////////////////////////////////////////////////////////////////
+  //
+  // Methods related to store transactions
+  //
+  /////////////////////////////////////////////////////////////////////
+  val transactions = new HashSet[MemoryStoreTransaction]()
+
+  def createStoreTransaction() =  {
+    val tx = new MemoryStoreTransaction()
+    using(tx) {
+      transactions.add(tx)
+    } >>: dispatchQueue
+    tx
+  }
 
-  def addMessageToQueue(queue:Long, seq:Long, msg:StoredMessageRef) = using(msg) {
-    val qd = queues.records.get(queue);
-    qd.messges.put(seq, msg.id)
-  } >>: queues.dispatchQueue
-
-  def removeMessageFromQueue(queue:Long, seq:Long, retained:Retained) = using(retained) {
-    val qd = queues.records.get(queue);
-    qd.messges.remove(seq)
-  } >>: queues.dispatchQueue
+  class MemoryStoreTransaction extends BaseRetained with StoreTransaction  {
+
+    val updated = HashMap[Long, MessageData]()
+
+    def store(delivery:Delivery) = {
+      if( delivery.storeId == -1 ) {
+        delivery.storeId = msg_id_generator.incrementAndGet
+        using(this) {
+          val md = new MessageData(delivery)
+          updated.put(delivery.storeId, md)
+          messages.put(delivery.storeId, md)
+        } >>: dispatchQueue
+      }
+    }
+
+    def enqueue(queue:Long, seq:Long, msg:Long) = {
+      using(this) {
+        val qd = queues.get(queue)
+        if( qd!=null ) {
+          val md = updated.getOrElse(msg, messages.get(msg))
+          md.queueRefs.incrementAndGet
+          qd.messges.put(seq, msg)
+        }
+      } >>: dispatchQueue
+    }
+
+    def dequeue(queue:Long, seq:Long, msg:Long) = {
+      using(this) {
+        val qd = queues.get(queue)
+        if( qd!=null ) {
+          val md = updated.getOrElse(msg, messages.get(msg))
+          md.queueRefs.decrementAndGet
+          qd.messges.remove(seq)
+        }
+      } >>: dispatchQueue
+    }
+
+
+    override def dispose = {
+      dispatchQueue {
+        updated.foreach{ x=>
+          if( x._2.queueRefs.get == 0 ) {
+            messages.remove(x._1)
+            x._2.onFlush.foreach( _() )
+          }
+        }
+        transactions.remove(MemoryStoreTransaction.this)
+        super.dispose
+      }
+    }
+  }
 
 }

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala?rev=961117&r1=961116&r2=961117&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala Wed Jul  7 03:59:44 2010
@@ -28,7 +28,6 @@ import org.fusesource.hawtbuf._
 trait DeliveryProducer {
 
   def dispatchQueue:DispatchQueue
-  def ack(value:Any) = {}
 
   def collocate(value:DispatchQueue):Unit = {
     if( value.getTargetQueue ne dispatchQueue.getTargetQueue ) {
@@ -47,17 +46,17 @@ trait DeliveryProducer {
 trait DeliveryConsumer extends Retained {
   def dispatchQueue:DispatchQueue;
   def matches(message:Delivery):Boolean
-  def connect(producer:DeliveryProducer):Session
+  def connect(producer:DeliveryProducer):DeliverySession
 }
 
 /**
- * Before a derlivery producer can send Delivery objects to a delivery
+ * Before a delivery producer can send Delivery objects to a delivery
  * consumer, it creates a Delivery session which it uses to send
  * the deliveries over.
  *
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-trait Session extends Sink[Delivery] {
+trait DeliverySession extends Sink[Delivery] {
   def producer:DeliveryProducer
   def consumer:DeliveryConsumer
   def close:Unit
@@ -108,6 +107,11 @@ trait Message {
    */
   def messageEvaluationContext:MessageEvaluationContext
 
+  /**
+   * The protocol encoding of the message.
+   */
+  def protocol:String
+
 }
 
 /**
@@ -133,35 +137,32 @@ class Delivery extends BaseRetained {
   var size:Int = 0
 
   /**
-   * the encoding format of the message
-   */
-  var encoding: String = null
-
-  /**
    *  the message being delivered
    */
   var message: Message = null
 
   /**
-   * the encoded form of the message being delivered.
+   * A reference to the stored version of the message.
    */
-  var encoded: Buffer = null
+  var storeId:Long = -1
 
-  var ref:StoredMessageRef = null
-
-  def copy() = (new Delivery).set(this)
+  /**
+   * The transaction the delivery is participating in.
+   */
+  var storeTx:StoreTransaction = null
 
   /**
-   * Set if the producer requires an ack to be sent back
+   * Set if the producer requires an ack to be sent back.  Consumer
+   * should execute once the message is processed.
    */
-  var ack:Any = null
+  var ack:(StoreTransaction)=>Unit = null
+
+  def copy() = (new Delivery).set(this)
 
   def set(other:Delivery) = {
     size = other.size
-    encoding = other.encoding
     message = other.message
-    encoded = other.encoded
-    ref = other.ref
+    storeId = other.storeId
     this
   }
 

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=961117&r1=961116&r2=961117&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala Wed Jul  7 03:59:44 2010
@@ -22,8 +22,7 @@ import collection.{SortedMap}
 import org.fusesource.hawtdispatch.{ScalaDispatch, DispatchQueue, BaseRetained}
 import org.apache.activemq.util.TreeMap.TreeEntry
 import java.util.{Collections, ArrayList, LinkedList}
-import org.apache.activemq.util.list.LinkedNode
-import org.apache.activemq.util.list.LinkedNodeList
+import org.apache.activemq.util.list.{LinkedNodeList, LinkedNode}
 
 /**
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
@@ -67,10 +66,26 @@ class Queue(val host: VirtualHost, val d
     debug("created queue for: " + destination)
   }
   setDisposer(^ {
+    ack_source.release
     dispatchQueue.release
     session_manager.release
   })
 
+
+  val ack_source = createSource(new ListEventAggregator[(LinkedQueueEntry, StoreTransaction)](), dispatchQueue)
+  ack_source.setEventHandler(^ {drain_acks});
+  ack_source.resume
+
+  val store_load_source = createSource(new ListEventAggregator[(QueueEntry, Delivery)](), dispatchQueue)
+  store_load_source.setEventHandler(^ {drain_store_loads});
+  store_load_source.resume
+
+  val store_flush_source = createSource(new ListEventAggregator[QueueEntry](), dispatchQueue)
+  store_flush_source.setEventHandler(^ {drain_store_flushes});
+  store_flush_source.resume
+
+  val session_manager = new SinkMux[Delivery](messages, dispatchQueue, Delivery)
+
   // sequence numbers.. used to track what's in the store.
   var first_seq = -1L
   var last_seq = -1L
@@ -85,6 +100,8 @@ class Queue(val host: VirtualHost, val d
   val entries = new LinkedNodeList[QueueEntry]()
   entries.addFirst(headEntry)
 
+  var flushingSize = 0
+
   /**
    * Tunning options.
    */
@@ -94,148 +111,89 @@ class Queue(val host: VirtualHost, val d
 
   private var size = 0
 
-  def swap() = {
-
-    class Prio(val entry:QueueEntry) extends Comparable[Prio] {
-      var value = 0
-      def compareTo(o: Prio) = o.value - value
-    }
-
-    val prios = new ArrayList[Prio](count)
-
-    var entry = entries.getHead
-    while( entry!=null ) {
-      if( entry.value.asTombstone == null ) {
-        prios.add(new Prio(entry))
-      }
-      entry = entry.getNext
-    }
-
-
-    /**
-     * adds keep priority to the range of entries starting at x
-     * and spanning the size provided.
-     */
-    def prioritize(i:Int, size:Int, p:Int):Unit = {
-      val prio = prios.get(i)
-      prio.value += p
-      val remainingSize = size - prio.entry.value.size
-      if( remainingSize > 0 ) {
-        val next = i + 1
-        if( next < prios.size ) {
-          prioritize(next, remainingSize, p-1)
-        }
-      }
-    }
-
-    // Prioritize the entries so that higher priority entries are swapped in,
-    // and lower priority entries are swapped out.
-    var i = 0
-    while( i < prios.size ) {
-      val prio = prios.get(i)
-      if( prio.entry.hasSubs ) {
-
-        var credits =0;
-        if( prio.entry.competing != Nil) {
-          credits += prio.entry.competing.size * tune_subscription_prefetch
-        } else{
-          if( prio.entry.browsing != Nil ) {
-            credits += tune_subscription_prefetch
-          }
-        }
-        prioritize(i, credits, 1000)
-
-      }
-      i += 1
-    }
-
-    Collections.sort(prios)
-
-    var remaining = tune_max_size / 2
-    i = 0
-    while( i < prios.size ) {
-      val prio = prios.get(i)
-      val entry = prio.entry
-      if( remaining > 0 ) {
-        remaining -= entry.value.size
-      }
+  object messages extends Sink[Delivery] {
 
-      i += 1
-    }
-
-  }
-
-  object messages extends Sink[QueueEntry] {
     var refiller: Runnable = null
 
-    def full = size >= tune_max_size
-
-    def offer(value: QueueEntry): Boolean = {
+    def full = if(size >= tune_max_size)
+      true
+    else
+      false
 
+    def offer(delivery: Delivery): Boolean = {
       if (full) {
         false
       } else {
 
-        val ref = value.value.ref
-        if (ref != null) {
-          host.database.addMessageToQueue(storeId, value.seq, ref)
-          ref.release
+        val entry = tailEntry
+        tailEntry = new QueueEntry(Queue.this)
+        entry.created(next_message_seq, delivery)
+
+        if( delivery.ack!=null ) {
+          delivery.ack(delivery.storeTx)
+        }
+        if (delivery.storeId != -1) {
+          delivery.storeTx.enqueue(storeId, entry.seq, delivery.storeId)
+          delivery.storeTx.release
         }
 
-        size += value.value.size
-        entries.addLast(value)
+        size += entry.value.size
+        entries.addLast(entry)
         counter += 1;
 
-//        if( full ) {
+        if( full ) {
 //          swap
-//        }
+        }
 
-        if( value.hasSubs ) {
-          value.dispatch
+        if( entry.hasSubs ) {
+          entry.dispatch
         }
         true
       }
     }
   }
 
-  def ack(entry: QueueEntry) = {
+  def ack(entry: QueueEntry, tx:StoreTransaction) = {
 
-    if (entry.value.ref != null) {
-      host.database.removeMessageFromQueue(storeId, entry.seq, null)
+    if (entry.value.ref != -1) {
+      val transaction = if( tx == null ) {
+        host.database.createStoreTransaction
+      } else {
+        tx
+      }
+      transaction.dequeue(storeId, entry.seq, entry.value.ref)
+      if( tx == null ) {
+        transaction.release
+      }
+    }
+    if( tx != null ) {
+      tx.release
     }
 
     counter -= 1
     size -= entry.value.size
-
     entry.tombstone
 
     if (counter == 0) {
+//      trace("empty.. triggering refill")
       messages.refiller.run
     }
   }
 
 
-  def nack(values: List[QueueEntry]) = {
+  def nack(values: LinkedNodeList[LinkedQueueEntry]) = {
     // TODO:
-    for (v <- values) {
-    }
   }
-  
 
-  val session_manager = new SinkMux[Delivery](MapSink(messages) {x => accept(x)}, dispatchQueue, Delivery)
-
-  val ack_source = createSource(new ListEventAggregator[(Subscription, QueueEntry)](), dispatchQueue)
-  ack_source.setEventHandler(^ {drain_acks});
-  ack_source.resume
 
   def drain_acks = {
     ack_source.getData.foreach {
-      event =>
-        event._1._ack(event._2)
+      case (entry, tx) =>
+        entry.unlink
+        ack(entry.value, tx)
     }
   }
-
-
+  
   /////////////////////////////////////////////////////////////////////
   //
   // Implementation of the DeliveryConsumer trait.  Allows this queue
@@ -245,7 +203,7 @@ class Queue(val host: VirtualHost, val d
 
   def matches(message: Delivery) = {true}
 
-  def connect(p: DeliveryProducer) = new Session {
+  def connect(p: DeliveryProducer) = new DeliverySession {
     retain
 
     override def consumer = Queue.this
@@ -262,11 +220,25 @@ class Queue(val host: VirtualHost, val d
     // Delegate all the flow control stuff to the session
     def full = session.full
 
-    def offer(value: Delivery) = {
+    def offer(delivery: Delivery) = {
       if (session.full) {
         false
       } else {
-        val rc = session.offer(sent(value))
+
+        // Called from the producer thread before the delivery is
+        // processed by the queue's thread.. We don't
+        // yet know the order of the delivery in the queue.
+        if (delivery.storeId != -1) {
+          // If the message has a store id, then this delivery will
+          // need a tx to track the store changes.
+          if( delivery.storeTx == null ) {
+            delivery.storeTx = host.database.createStoreTransaction
+          } else {
+            delivery.storeTx.retain
+          }
+        }
+
+        val rc = session.offer(delivery)
         assert(rc, "session should accept since it was not full")
         true
       }
@@ -313,37 +285,140 @@ class Queue(val host: VirtualHost, val d
   //
   /////////////////////////////////////////////////////////////////////
 
-  /**
-   * Called from the producer thread before the delivery is
-   * processed by the queues' thread.. therefore we don't
-   * yet know the order of the delivery in the queue.
-   */
-  private def sent(delivery: Delivery) = {
-    if (delivery.ref != null) {
-      // retain the persistent ref so that the delivery is not
-      // considered completed until this queue stores it
-      delivery.ref.retain
-    }
-    delivery
+  private def next_message_seq = {
+    val rc = message_seq_counter
+    message_seq_counter += 1
+    rc
   }
 
-  /**
-   * Called from the queue thread.  At this point we
-   * know the order.  Converts the delivery to a QueueEntry
-   */
-  private def accept(delivery: Delivery) = {
-    val rc = tailEntry
-    tailEntry = new QueueEntry(this)
-    rc.loaded(next_message_seq, delivery)
+  def swap() = {
+
+    class Prio(val entry:QueueEntry) extends Comparable[Prio] {
+      var value = 0
+      def compareTo(o: Prio) = o.value - value
+    }
+
+    val prios = new ArrayList[Prio](count)
+
+    var entry = entries.getHead
+    while( entry!=null ) {
+      if( entry.value.asTombstone == null ) {
+        prios.add(new Prio(entry))
+      }
+      entry = entry.getNext
+    }
+
+
+    /**
+     * adds keep priority to the range of entries starting at x
+     * and spanning the size provided.
+     */
+    def prioritize(i:Int, size:Int, p:Int):Unit = {
+      val prio = prios.get(i)
+      prio.value += p
+      val remainingSize = size - prio.entry.value.size
+      if( remainingSize > 0 ) {
+        val next = i + 1
+        if( next < prios.size ) {
+          prioritize(next, remainingSize, p-1)
+        }
+      }
+    }
+
+    // Prioritize the entries so that higher priority entries are swapped in,
+    // and lower priority entries are swapped out.
+    var i = 0
+    while( i < prios.size ) {
+      val prio = prios.get(i)
+      if( prio.entry.hasSubs ) {
+
+        var credits =0;
+        if( prio.entry.competing != Nil) {
+          credits += prio.entry.competing.size * tune_subscription_prefetch
+        } else{
+          if( prio.entry.browsing != Nil ) {
+            credits += tune_subscription_prefetch
+          }
+        }
+        prioritize(i, credits, 1000)
+
+      }
+      i += 1
+    }
+
+    Collections.sort(prios)
+
+    var remaining = tune_max_size / 2
+    i = 0
+    while( i < prios.size ) {
+      val prio = prios.get(i)
+      val entry = prio.entry
+      if( remaining > 0 ) {
+        remaining -= entry.value.size
+        val stored = entry.value.asStored
+        if( stored!=null && !stored.loading) {
+          // start loading it back...
+          stored.loading = true
+          host.database.loadDelivery(stored.ref) { delivery =>
+            // pass off to a source so it can aggregate multiple
+            // loads to reduce cross thread synchronization
+            if( delivery.isDefined ) {
+              store_load_source.merge((entry, delivery.get))
+            }
+          }
+        }
+      } else {
+        // Chuck the reset out...
+        val loaded = entry.value.asLoaded
+        if( loaded!=null ) {
+          var ref = loaded.delivery.storeId
+          if( ref == -1 ) {
+            val tx = host.database.createStoreTransaction
+            tx.store(loaded.delivery)
+            tx.enqueue(storeId, entry.seq, loaded.delivery.storeId)
+            tx.release
+          }
+          flushingSize += entry.value.size
+          host.database.flushDelivery(ref) {
+            store_flush_source.merge(entry)
+          }
+        }
+      }
+      i += 1
+    }
   }
 
+  def drain_store_loads() = {
+    val data = store_load_source.getData
+    var ready = List[QueueEntry]()
 
-  private def next_message_seq = {
-    val rc = message_seq_counter
-    message_seq_counter += 1
-    rc
+    data.foreach { event =>
+      val entry = event._1
+      entry.loaded(event._2)
+      size += entry.value.size
+
+      if( entry.hasSubs ) {
+        ready ::= entry
+      }
+    }
+
+    ready.foreach { entry =>
+      entry.dispatch
+    }
   }
 
+  def drain_store_flushes() = {
+    store_flush_source.getData.foreach { entry =>
+      flushingSize -= entry.value.size
+
+      // by the time we get called back, subs my be interested in the entry
+      // or it may have been acked.
+      if( !entry.hasSubs && entry.value.asLoaded!=null ) {
+        size += entry.value.size
+        entry.stored
+      }
+    }
+  }
 
 }
 
@@ -364,12 +439,23 @@ class QueueEntry(val queue:Queue) extend
     (seq - o.seq).toInt
   }
 
-  def loaded(seq:Long, delivery:Delivery) = {
+  def created(seq:Long, delivery:Delivery) = {
     this.seq = seq
     this.value = new Loaded(delivery)
     this
   }
 
+  def loaded(delivery:Delivery) = {
+    this.value = new Loaded(delivery)
+    this
+  }
+
+  def stored() = {
+    val loaded = value.asLoaded
+    this.value = new Stored(loaded.delivery.storeId, loaded.size)
+    this
+  }
+
   def tombstone = {
     this.value = new Tombstone()
     if( seq != -1L ) {
@@ -465,7 +551,7 @@ class QueueEntry(val queue:Queue) extend
   trait EntryType {
     def size:Int
     def dispatch():QueueEntry
-    def ref:StoredMessageRef
+    def ref:Long
 
     def asTombstone:Tombstone = null
     def asStored:Stored = null
@@ -477,7 +563,7 @@ class QueueEntry(val queue:Queue) extend
     var count = 1L
 
     def size = 0
-    def ref = null
+    def ref = -1
 
     override def asTombstone = this
 
@@ -492,12 +578,9 @@ class QueueEntry(val queue:Queue) extend
     
   }
 
-  class Stored extends EntryType {
-
-    private var loading = false
+  class Stored(val ref:Long, val size:Int) extends EntryType {
 
-    var ref:StoredMessageRef = null
-    var size = 0
+    var loading = false
 
     override def asStored = this
 
@@ -511,8 +594,9 @@ class QueueEntry(val queue:Queue) extend
   class Loaded(val delivery: Delivery) extends EntryType {
 
     var aquired = false
-    def ref = delivery.ref
+    def ref = delivery.storeId
     def size = delivery.size
+    def flushing = false
     
     override  def asLoaded = this
 
@@ -529,8 +613,6 @@ class QueueEntry(val queue:Queue) extend
 
         if( browsing!=Nil ) {
           val offering = delivery.copy
-          offering.ack = null
-
           browsing.foreach { sub =>
             if (sub.matches(offering)) {
               if (sub.offer(offering)) {
@@ -545,10 +627,6 @@ class QueueEntry(val queue:Queue) extend
         }
 
         if( competing!=Nil ) {
-
-          val offering = delivery.copy
-          offering.ack = QueueEntry.this
-
           if (!this.aquired) {
             aquired = true
 
@@ -557,12 +635,20 @@ class QueueEntry(val queue:Queue) extend
             while( remaining!=Nil && picked == null ) {
               val sub = remaining.head
               remaining = remaining.drop(1)
-
-              if (sub.matches(offering)) {
+              if (sub.matches(delivery)) {
                 competingSlowSubs = competingSlowSubs ::: sub :: Nil
-                if (sub.offer(offering)) {
-                  picked = sub
+
+                if( !sub.full ) {
+                  val node = sub.add(QueueEntry.this)
+                  val offering = delivery.copy
+                  offering.ack = (tx)=> {
+                    queue.ack_source.merge((node, tx))
+                  }
+                  if (sub.offer(offering)) {
+                    picked = sub
+                  }
                 }
+
               } else {
                 competingFastSubs = competingFastSubs ::: sub :: Nil
               }
@@ -598,12 +684,15 @@ class QueueEntry(val queue:Queue) extend
 
 }
 
+
+class LinkedQueueEntry(val value:QueueEntry) extends LinkedNode[LinkedQueueEntry]
+
 class Subscription(queue:Queue) extends DeliveryProducer {
 
   def dispatchQueue = queue.dispatchQueue
 
-  var dispatched = List[QueueEntry]()
-  var session: Session = null
+  var dispatched = new LinkedNodeList[LinkedQueueEntry]
+  var session: DeliverySession = null
   var pos:QueueEntry = null
 
   def position(value:QueueEntry):Unit = {
@@ -625,38 +714,12 @@ class Subscription(queue:Queue) extends 
   }
 
   def matches(entry:Delivery) = session.consumer.matches(entry)
+  def full = session.full
+  def offer(delivery:Delivery) = session.offer(delivery)
 
-  def offer(delivery:Delivery) = {
-    if (session.offer(delivery)) {
-      if( delivery.ack!=null ) {
-        val entry = delivery.ack.asInstanceOf[QueueEntry]
-        dispatched = dispatched ::: entry :: Nil
-      }
-      true
-    } else {
-      false
-    }
-  }
-
-  // called from the consumer thread.. send it to the ack_source
-  // do that it calls _ack from the queue thread.
-  override def ack(value: Any) = {
-    val entry = value.asInstanceOf[QueueEntry]
-    queue.ack_source.merge((this, entry))
-  }
-
-  def _ack(entry: QueueEntry): Unit = {
-    assert(!dispatched.isEmpty)
-    if (dispatched.head == entry) {
-      // this should be the common case...
-      dispatched = dispatched.drop(1)
-    } else {
-      // but lets also handle the case where we get an ack out of order.
-      val rc = dispatched.partition(_ == entry)
-      assert(rc._1.size == 1)
-      dispatched = rc._2
-    }
-    queue.ack(entry)
+  def add(entry:QueueEntry) = {
+    val rc = new LinkedQueueEntry(entry)
+    dispatched.addLast(rc)
+    rc
   }
-
 }

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala?rev=961117&r1=961116&r2=961117&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala Wed Jul  7 03:59:44 2010
@@ -239,7 +239,7 @@ class DeliveryProducerRoute(val destinat
     dispatchQueue.release
   })
 
-  var targets = List[Session]()
+  var targets = List[DeliverySession]()
 
   def connected(targets:List[DeliveryConsumer]) = retaining(targets) {
     internal_bind(targets)
@@ -289,7 +289,7 @@ class DeliveryProducerRoute(val destinat
   //
 
   var overflow:Delivery=null
-  var overflowSessions = List[Session]()
+  var overflowSessions = List[DeliverySession]()
   var refiller:Runnable=null
 
   def full = overflow!=null

Copied: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/TransactionManagerX.scala (from r961116, activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/TransactionManager.scala)
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/TransactionManagerX.scala?p2=activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/TransactionManagerX.scala&p1=activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/TransactionManager.scala&r1=961116&r2=961117&rev=961117&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/TransactionManager.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/TransactionManagerX.scala Wed Jul  7 03:59:44 2010
@@ -18,7 +18,10 @@ package org.apache.activemq.apollo.broke
 
 import _root_.java.util.{LinkedHashMap, HashMap}
 
-class TransactionManager() {
+
+
+
+class TransactionManagerX() {
 
   var virtualHost:VirtualHost = null
 
@@ -390,7 +393,7 @@ class TransactionManager() {
  * Keeps track of all the actions the need to be done when a transaction does a
  * commit or rollback.
  */
-abstract class Transaction {
+abstract class TransactionX {
 
 // TODO:
 //    private static final Log LOG = LogFactory.getLog(Transaction.class);
@@ -927,7 +930,7 @@ abstract class Transaction {
  * @author cmacnaug
  * @version 1.0
  */
-class LocalTransaction extends Transaction {
+class LocalTransactionX extends TransactionX {
 
 //    TODO:
 //    LocalTransaction(TransactionManager manager, long tid, IQueue<Long, TxOp> opQueue) {
@@ -1028,7 +1031,7 @@ class LocalTransaction extends Transacti
  * @author cmacnaug
  * @version 1.0
  */
-class XATransaction extends Transaction {
+class XATransactionX extends TransactionX {
 // TODO:
 //    private final Buffer xid;
 //

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala?rev=961117&r1=961116&r2=961117&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala Wed Jul  7 03:59:44 2010
@@ -86,8 +86,8 @@ class VirtualHost(val broker: Broker) ex
     this.names = names.toList
   }
 
-  var database:BrokerDatabase = new BrokerDatabase(this)
-  var transactionManager:TransactionManager = new TransactionManager
+  var database:BrokerDatabase = new MemoryBrokerDatabase(this)
+  var transactionManager:TransactionManagerX = new TransactionManagerX
 
   override def toString = if (config==null) "virtual-host" else "virtual-host: "+config.id
 

Modified: activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala?rev=961117&r1=961116&r2=961117&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala Wed Jul  7 03:59:44 2010
@@ -37,6 +37,8 @@ import StompConstants._;
 import BufferConversions._
 
 case class StompFrameMessage(frame:StompFrame) extends Message {
+  
+  def protocol = "stomp"
 
   /**
    * the globally unique id of the message

Modified: activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala?rev=961117&r1=961116&r2=961117&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala Wed Jul  7 03:59:44 2010
@@ -73,7 +73,7 @@ class StompProtocolHandler extends Proto
       delivery.message.isInstanceOf[StompFrameMessage]
     }
 
-    def connect(p:DeliveryProducer) = new Session {
+    def connect(p:DeliveryProducer) = new DeliverySession {
       retain
 
       def producer = p
@@ -93,7 +93,7 @@ class StompProtocolHandler extends Proto
           false
         } else {
           if( delivery.ack!=null ) {
-            producer.ack(delivery.ack)
+            delivery.ack(null)
           }
           val frame = delivery.message.asInstanceOf[StompFrameMessage].frame
           val rc = session.offer(frame)