You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2010/07/07 05:58:43 UTC

svn commit: r961111 - in /activemq/sandbox/activemq-apollo-actor: activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/ activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/ activemq-dto/src/main/java/org/apache/activemq/ap...

Author: chirino
Date: Wed Jul  7 03:58:43 2010
New Revision: 961111

URL: http://svn.apache.org/viewvc?rev=961111&view=rev
Log:
starting to flesh out the persistence side of things

Added:
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/BrokerDatabase.scala
Modified:
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Destinations.scala
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BaseBrokerPerfSupport.scala
    activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/VirtualHostDTO.java
    activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala
    activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala?rev=961111&r1=961110&r2=961111&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala Wed Jul  7 03:58:43 2010
@@ -269,7 +269,7 @@ object Queue extends Log {
  *
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-class Queue(val destination:Destination) extends BaseRetained with Route with DeliveryConsumer with DeliveryProducer with DispatchLogging {
+class Queue(val destination:Destination, val storeId:Long) extends BaseRetained with Route with DeliveryConsumer with DeliveryProducer with DispatchLogging {
   override protected def log = Queue
 
   override val dispatchQueue:DispatchQueue = createQueue("queue:"+destination);
@@ -278,7 +278,33 @@ class Queue(val destination:Destination)
     debug("created queue for: "+destination)
   }
 
-  val delivery_buffer  = new DeliveryBuffer
+  // sequence numbers.. used to track what's in the store.
+  var first_seq = -1L
+  var last_seq = -1L
+  var message_seq_counter=0L
+  def next_message_seq = {
+    val rc = message_seq_counter
+    message_seq_counter += 1
+    rc
+  }
+
+  var count = 0
+
+  val delivery_buffer  = new DeliveryBuffer {
+
+    override def send(delivery: Delivery) = {
+      // Is it a persistent message?
+      if( delivery.ref!=null ) {
+        // next_message_seq
+
+      }
+      super.send(delivery)
+    }
+
+    override def ack(delivery: Delivery) = {
+      super.ack(delivery)
+    }
+  }
   delivery_buffer.eventHandler = ^{ drain_delivery_buffer }
 
   val session_manager = new DeliverySessionManager(delivery_buffer, dispatchQueue)
@@ -292,7 +318,7 @@ class Queue(val destination:Destination)
     var bound=true
 
     def deliver(value:Delivery):Unit = {
-      val delivery = Delivery(value)
+      val delivery = value.copy
       delivery.setDisposer(^{
         ^{ completed(value) } >>:dispatchQueue
       })
@@ -315,7 +341,7 @@ class Queue(val destination:Destination)
   def connected(consumers:List[DeliveryConsumer]) = bind(consumers)
   def bind(consumers:List[DeliveryConsumer]) = retaining(consumers) {
       for ( consumer <- consumers ) {
-        val cs = new ConsumerState(consumer.open_session(dispatchQueue))
+        val cs = new ConsumerState(consumer.connect(dispatchQueue))
         allConsumers += consumer->cs
         readyConsumers.addLast(cs)
       }
@@ -353,16 +379,16 @@ class Queue(val destination:Destination)
     }
   }
 
-  def open_session(producer_queue:DispatchQueue) = new DeliverySession {
+  def connect(producer_queue:DispatchQueue) = new DeliverySession {
 
-    val session = session_manager.session(producer_queue)
+    val session = session_manager.open(producer_queue)
     val consumer = Queue.this
     retain
 
     def deliver(delivery:Delivery) = session.send(delivery)
 
     def close = {
-      session.close
+      session_manager.close(session)
       release
     }
   }

Added: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/BrokerDatabase.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/BrokerDatabase.scala?rev=961111&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/BrokerDatabase.scala (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/BrokerDatabase.scala Wed Jul  7 03:58:43 2010
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.broker
+
+import _root_.java.lang.{String}
+import _root_.org.fusesource.hawtdispatch.ScalaDispatch._
+import org.fusesource.hawtbuf._
+import org.apache.activemq.util.TreeMap
+import java.util.concurrent.atomic.{AtomicLong}
+import java.util.{HashSet}
+import collection.JavaConversions
+
+class Record
+case class QueueRecord(val id:Long, val name:AsciiBuffer, val parent:AsciiBuffer, val config:String) extends Record
+class MessageRecord(val id:Long, val msgId:AsciiBuffer, val encoding: AsciiBuffer, val message:Buffer)  extends Record
+class QueueEntryRecord(val queue:Long, val seqId:Long, val msgId:Long) extends Record
+class SubscriptionRecord(val id:Long, val pk:AsciiBuffer, val selector:AsciiBuffer, val destination:AsciiBuffer, val durable:Boolean, val tte:Long, val attachment:Buffer) extends Record
+class Action
+case class CreateRecord(record:Record) extends Action
+case class UpdateRecord(record:Record) extends Action
+case class DeleteRecord(id:Long) extends Action
+
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+class BrokerDatabase(host:VirtualHost) {
+
+  def start() ={
+  }
+
+  def stop() = {
+  }
+
+  private object messages {
+
+    val dispatchQueue = createQueue("MessagesTable")
+    val messages = new TreeMap[Long, MessageRecord]
+    val inprogress = new HashSet[Long]()
+
+    def add(record:MessageRecord) = {
+      val id= record.id
+
+      // the inprogress list protects the message from being
+      // gced too soon.  Protection ends once StoredMessageRef
+      // is disposed..
+      val ref = new StoredMessageRef(id) {
+        override def dispose = ^{
+          inprogress.remove(id)
+        } >>: dispatchQueue
+      }
+
+      using(ref) {
+        inprogress.add(id)
+        messages.put(record.id, record)
+      } >>: dispatchQueue
+
+      ref
+    }
+
+    def get(id:Long, cb:(MessageRecord)=>Unit) = reply(cb) {
+      messages.get(id)
+    } >>: dispatchQueue
+
+  }
+
+  private val msg_id_generator = new AtomicLong
+  def createMessageRecord(msgId:AsciiBuffer, encoding:AsciiBuffer, message:Buffer) = {
+    val record = new MessageRecord(msg_id_generator.incrementAndGet, msgId, encoding, message)
+    messages.add(record)
+  }
+
+
+
+  case class QueueData(val record:QueueRecord) {
+    var messges:List[Long] = Nil
+  }
+
+  private object queues {
+    var _next_id = 0L;
+    def next_id = {
+      val rc = _next_id
+      _next_id += 1
+      rc
+    }
+
+    val dispatchQueue = createQueue("QueuesTable")
+    val records = new TreeMap[Long, QueueData]
+  }
+
+  case class QueueInfo(record:QueueRecord, first:Long, last:Long, count:Int)
+
+  def listQueues(cb: (Seq[Long])=>Unit ) = reply(cb) {
+    JavaConversions.asSet(queues.records.keySet).toSeq
+  } >>: queues.dispatchQueue
+
+  def getQueueInfo(id:Long)(cb:(Option[QueueInfo])=>Unit ) = reply(cb) {
+    val qd = queues.records.get(id)
+    if( qd == null ) {
+      None
+    } else {
+      Some(
+        if( qd.messges.isEmpty ) {
+          QueueInfo(qd.record, -1, -1, 0)
+        } else {
+          QueueInfo(qd.record, qd.messges.head, qd.messges.last, qd.messges.size)
+        }
+      )
+    }
+  } >>: queues.dispatchQueue
+
+
+  def addQueue(record:QueueRecord)(cb:(Option[Long])=>Unit):Unit = reply(cb) {
+    val id = queues.next_id
+    if( queues.records.containsKey(id) ) {
+      None
+    } else {
+      queues.records.put(id, QueueData(record))
+      Some(id)
+    }
+  } >>: queues.dispatchQueue
+
+}

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala?rev=961111&r1=961110&r2=961111&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala Wed Jul  7 03:58:43 2010
@@ -16,15 +16,19 @@
  */
 package org.apache.activemq.apollo.broker
 
-import _root_.java.util.{LinkedList}
 import _root_.org.apache.activemq.filter.{MessageEvaluationContext}
 import _root_.java.lang.{String}
 import _root_.org.fusesource.hawtdispatch._
 import _root_.org.fusesource.hawtdispatch.ScalaDispatch._
 import org.apache.activemq.transport.Transport
 import org.fusesource.hawtbuf._
+import org.apache.activemq.util.TreeMap
+import java.util.concurrent.atomic.{AtomicLong, AtomicInteger}
+import java.util.{HashSet, LinkedList}
 
 /**
+ * A producer which sends Delivery objects to a delivery consumer.
+ *
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
 trait DeliveryProducer {
@@ -32,23 +36,30 @@ trait DeliveryProducer {
 }
 
 /**
+ * The delivery consumer accepts messages from a delivery producer.
+ *
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-trait DeliverySession {
-  val consumer:DeliveryConsumer
-  def deliver(delivery:Delivery)
-  def close:Unit
+trait DeliveryConsumer extends Retained {
+  def matches(message:Delivery)
+  val dispatchQueue:DispatchQueue;
+  def connect(producer:DispatchQueue):DeliverySession
 }
 
 /**
+ * Before a derlivery producer can send Delivery objects to a delivery
+ * consumer, it creates a Delivery session which it uses to send
+ * the deliveries over.
+ *
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-trait DeliveryConsumer extends Retained {
-  def matches(message:Delivery)
-  val dispatchQueue:DispatchQueue;
-  def open_session(producer_queue:DispatchQueue):DeliverySession
+trait DeliverySession {
+  val consumer:DeliveryConsumer
+  def deliver(delivery:Delivery)
+  def close:Unit
 }
 
+
 /**
  * Abstracts wire protocol message implementations.  Each wire protocol
  * will provide it's own type of Message.
@@ -95,311 +106,68 @@ trait Message {
 
 }
 
+case class StoredMessageRef(id:Long) extends BaseRetained
+
 /**
+ * <p>
+ * A new Delivery object is created every time a message is transfered between a producer and
+ * it's consumer or consumers.  Consumers will retain the object to flow control the producer.
+ * </p>
+ * <p>
+ * Once this object is disposed, the producer is free to send more deliveries to the consumers.
+ * </p>
+ *
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
 object Delivery {
-  def apply(o:Delivery) = new Delivery(o.message, o.size, o.encoded, o.encoding, o.ack, o.tx_id, o.store_id)
+//  def apply(o:Delivery) = {
+//    rc new Delivery();
+//    o.message, o.size, o.encoded, o.encoding, o.ack, o.tx_id, o.store_id)
+//  }
 }
 
-case class Delivery (
 
-  /**
-   *  the message being delivered
-   */
-  message: Message,
+class Delivery extends BaseRetained {
 
   /**
    * memory size of the delivery.  Used for resource allocation tracking
    */
-  size:Int,
+  var size:Int = 0
 
   /**
-   * the encoded form of the message being delivered.
+   * the encoding format of the message
    */
-  encoded: Buffer = null,
+  var encoding: String = null
 
   /**
-   * the encoding format of the message
+   *  the message being delivered
    */
-  encoding: String = null,
+  var message: Message = null
 
   /**
-   *  true if this delivery requires acknowledgment.
+   * the encoded form of the message being delivered.
    */
-  ack:Boolean = false,
+  var encoded: Buffer = null
 
-  /**
-   * The id used to identify the transaction that the message
-   * belongs to.
-   */
-  tx_id:Long = -1,
-
-  /**
-   * The id used to identify this message in the message
-   * store.
-   *
-   * @return The store tracking or -1 if not set.
-   */
-  store_id: Long = -1
-
-) extends BaseRetained {
-
-}
-
-//abstract class BrokerMessageDelivery extends MessageDelivery {
-// TODO:
-//    // True while the message is being dispatched to the delivery targets:
-//    boolean dispatching = false;
-//
-//    // A non null pending save indicates that the message is the
-//    // saver queue and that the message
-//    OperationContext<?> pendingSave;
-//
-//    // List of persistent targets for which the message should be saved
-//    // when dispatch is complete:
-//    HashMap<QueueDescriptor, SaveableQueueElement<MessageDelivery>> persistentTargets;
-//    SaveableQueueElement<MessageDelivery> singleTarget;
-//
-//    long storeTracking = -1;
-//    BrokerDatabase store;
-//    boolean fromStore = false;
-//    boolean enableFlushDelay = true;
-//    private int limiterSize = -1;
-//    private long tid=-1;
-//
-//    public void setFromDatabase(BrokerDatabase database, MessageRecord mRecord) {
-//        fromStore = true;
-//        store = database;
-//        storeTracking = mRecord.getKey();
-//        limiterSize = mRecord.getSize();
-//    }
-//
-//    public final int getFlowLimiterSize() {
-//        if (limiterSize == -1) {
-//            limiterSize = getMemorySize();
-//        }
-//        return limiterSize;
-//    }
-//
-//    /**
-//     * When an application wishes to include a message in a broker transaction
-//     * it must set this the tid returned by {@link Transaction#getTid()}
-//     *
-//     * @param tid
-//     *            Sets the tid used to identify the transaction at the broker.
-//     */
-//    public void setTransactionId(long tid) {
-//        this.tid = tid;
-//    }
-//
-//    /**
-//     * @return The tid used to identify the transaction at the broker.
-//     */
-//    public final long getTransactionId() {
-//        return tid;
-//    }
-//
-//    public final void clearTransactionId() {
-//        tid = -1;
-//    }
-//
-//    /**
-//     * Subclass must implement this to return their current memory size
-//     * estimate.
-//     *
-//     * @return The memory size of the message.
-//     */
-//    public abstract int getMemorySize();
-//
-//    public final boolean isFromStore() {
-//        return fromStore;
-//    }
-//
-//    public final void persist(SaveableQueueElement<MessageDelivery> sqe, ISourceController<?> controller, boolean delayable) {
-//        synchronized (this) {
-//            // Can flush of this message to the store be delayed?
-//            if (enableFlushDelay && !delayable) {
-//                enableFlushDelay = false;
-//            }
-//            // If this message is being dispatched then add the queue to the
-//            // list of queues for which to save the message when dispatch is
-//            // finished:
-//            if (dispatching) {
-//                addPersistentTarget(sqe);
-//                return;
-//            }
-//            // Otherwise, if it is still in the saver queue, we can add this
-//            // queue to the queue list:
-//            else if (pendingSave != null) {
-//                addPersistentTarget(sqe);
-//                if (!delayable) {
-//                    pendingSave.requestFlush();
-//                }
-//                return;
-//            }
-//        }
-//
-//        store.saveMessage(sqe, controller, delayable);
-//    }
-//
-//    public final void acknowledge(SaveableQueueElement<MessageDelivery> sqe) {
-//        boolean firePersistListener = false;
-//        boolean deleted = false;
-//        synchronized (this) {
-//            // If the message hasn't been saved to the database
-//            // then we don't need to issue a delete:
-//            if (dispatching || pendingSave != null) {
-//
-//                deleted = true;
-//
-//                removePersistentTarget(sqe.getQueueDescriptor());
-//                // We get a save context when we place the message in the
-//                // database queue. If it has been added to the queue,
-//                // and we've removed the last queue, see if we can cancel
-//                // the save:
-//                if (pendingSave != null && !hasPersistentTargets()) {
-//                    if (pendingSave.cancel()) {
-//                        pendingSave = null;
-//                        if (isPersistent()) {
-//                            firePersistListener = true;
-//                        }
-//                    }
-//                }
-//            }
-//        }
-//
-//        if (!deleted) {
-//            store.deleteQueueElement(sqe);
-//        }
-//
-//        if (firePersistListener) {
-//            onMessagePersisted();
-//        }
-//
-//    }
-//
-//    public final void setStoreTracking(long tracking) {
-//        if (storeTracking == -1) {
-//            storeTracking = tracking;
-//        }
-//    }
-//
-//    public final void beginDispatch(BrokerDatabase database) {
-//        this.store = database;
-//        dispatching = true;
-//        setStoreTracking(database.allocateStoreTracking());
-//    }
-//
-//    public long getStoreTracking() {
-//        return storeTracking;
-//    }
-//
-//    public synchronized Collection<SaveableQueueElement<MessageDelivery>> getPersistentQueues() {
-//        if (singleTarget != null) {
-//            ArrayList<SaveableQueueElement<MessageDelivery>> list = new ArrayList<SaveableQueueElement<MessageDelivery>>(1);
-//            list.add(singleTarget);
-//            return list;
-//        } else if (persistentTargets != null) {
-//            return persistentTargets.values();
-//        }
-//        return null;
-//    }
-//
-//    public void beginStore() {
-//        synchronized (this) {
-//            pendingSave = null;
-//        }
-//    }
-//
-//    private final boolean hasPersistentTargets() {
-//        return (persistentTargets != null && !persistentTargets.isEmpty()) || singleTarget != null;
-//    }
-//
-//    private final void removePersistentTarget(QueueDescriptor queue) {
-//        if (persistentTargets != null) {
-//            persistentTargets.remove(queue);
-//            return;
-//        }
-//
-//        if (singleTarget != null && singleTarget.getQueueDescriptor().equals(queue)) {
-//            singleTarget = null;
-//        }
-//    }
-//
-//    private final void addPersistentTarget(SaveableQueueElement<MessageDelivery> elem) {
-//        if (persistentTargets != null) {
-//            persistentTargets.put(elem.getQueueDescriptor(), elem);
-//            return;
-//        }
-//
-//        if (singleTarget == null) {
-//            singleTarget = elem;
-//            return;
-//        }
-//
-//        if (elem.getQueueDescriptor() != singleTarget.getQueueDescriptor()) {
-//            persistentTargets = new HashMap<QueueDescriptor, SaveableQueueElement<MessageDelivery>>();
-//            persistentTargets.put(elem.getQueueDescriptor(), elem);
-//            persistentTargets.put(singleTarget.getQueueDescriptor(), singleTarget);
-//            singleTarget = null;
-//        }
-//    }
-//
-//    public final void finishDispatch(ISourceController<?> controller) throws IOException {
-//        boolean firePersistListener = false;
-//        synchronized (this) {
-//            // If any of the targets requested save then save the message
-//            // Note that this could be the case even if the message isn't
-//            // persistent if a target requested that the message be spooled
-//            // for some other reason such as queue memory overflow.
-//            if (hasPersistentTargets()) {
-//                pendingSave = store.persistReceivedMessage(this, controller);
-//            }
-//
-//            // If none of the targets required persistence, then fire the
-//            // persist listener:
-//            if (pendingSave == null || !isPersistent()) {
-//                firePersistListener = true;
-//            }
-//            dispatching = false;
-//        }
-//
-//        if (firePersistListener) {
-//            onMessagePersisted();
-//        }
-//    }
-//
-//    public final MessageRecord createMessageRecord() {
-//
-//        MessageRecord record = new MessageRecord();
-//        record.setEncoding(getStoreEncoding());
-//        record.setBuffer(getStoreEncoded());
-//        record.setStreamKey((long) 0);
-//        record.setMessageId(getMsgId());
-//        record.setSize(getFlowLimiterSize());
-//        record.setKey(getStoreTracking());
-//        return record;
-//    }
-//
-//    /**
-//     * @return A buffer representation of the message to be stored in the store.
-//     * @throws
-//     */
-//    protected abstract Buffer getStoreEncoded();
-//
-//    /**
-//     * @return The encoding scheme used to store the message.
-//     */
-//    protected abstract AsciiBuffer getStoreEncoding();
-//
-//    public boolean isFlushDelayable() {
-//        // TODO Auto-generated method stub
-//        return enableFlushDelay;
-//    }
-//}
+  var ref:StoredMessageRef = null
+
+  def copy() = (new Delivery).set(this)
+
+  def set(other:Delivery) = {
+    size = other.size
+    encoding = other.encoding
+    message = other.message
+    encoded = other.encoded
+    ref = other.ref
+    this
+  }
+
+}
 
 /**
+ * <p>
+ * Defines the interface for objects which you can send flow controlled deliveries to.
+ * <p>
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
 trait DeliverySink {
@@ -408,6 +176,11 @@ trait DeliverySink {
 }
 
 /**
+ * <p>
+ * A delivery sink which is connected to a transport. It expects the caller's dispatch
+ * queue to be the same as the transport's/
+ * <p>
+ *
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
 class TransportDeliverySink(var transport:Transport) extends DeliverySink {
@@ -416,6 +189,21 @@ class TransportDeliverySink(var transpor
 }
 
 /**
+ * <p>
+ * A delivery sink which buffers deliveries sent to it up to it's
+ * maxSize settings after which it starts flow controlling the sender.
+ * <p>
+ *
+ * <p>
+ * It executes the eventHandler when it has buffered deliveries.  The event handler
+ * should call receive to get the queued deliveries and then ack the delivery
+ * once it has been processed.  The producer will now be resumed until
+ * the ack occurs.
+ * <p>
+ *
+ * <p>
+ * This class should only be called from a single serial dispatch queue.
+ * <p>
  *
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
@@ -457,6 +245,10 @@ class DeliveryBuffer(var maxSize:Int=102
 }
 
 /**
+ * Implements a delivery sink which sends to a 'down stream' delivery sink. If the
+ * down stream delivery sink is full, this sink buffers the overflow deliveries. So that the
+ * down stream sink does not need to worry about overflow.
+ *
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
 class DeliveryOverflowBuffer(val delivery_buffer:DeliverySink) extends DeliverySink {
@@ -483,7 +275,7 @@ class DeliveryOverflowBuffer(val deliver
   }
 
   protected def send_to_delivery_buffer(value:Delivery) = {
-    var delivery = Delivery(value)
+    var delivery = value.copy
     delivery.setDisposer(^{
       drainOverflow
     })
@@ -496,11 +288,27 @@ class DeliveryOverflowBuffer(val deliver
 }
 
 /**
+ * <p>
+ * A DeliverySessionManager manages multiple credit based
+ * transmission windows from multiple producers to a single consumer.
+ * </p>
+ *
+ * <p>
+ * Producers and consumers are typically executing on different threads and
+ * the overhead of doing cross thread flow control is much higher than if
+ * a credit window is used.  The credit window allows the producer to
+ * send multiple messages to the consumer without needing to wait for
+ * consumer events.  Only when the producer runs out of credit, does the
+ * he start overflowing the producer.  This class makes heavy
+ * use of Dispatch Source objects to coalesce cross thread events
+ * like the sending messages or credits.
+ * </p>
+ *
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-class DeliverySessionManager(val sink:DeliverySink, val queue:DispatchQueue) extends BaseRetained {
+class DeliverySessionManager(val targetSink:DeliverySink, val queue:DispatchQueue) extends BaseRetained {
 
-  var sessions = List[SessionServer]()
+  var sessions = List[Session]()
 
   var session_min_credits = 1024*4;
   var session_credit_capacity = 1024*32
@@ -513,6 +321,7 @@ class DeliverySessionManager(val sink:De
   })
 
   // use a event aggregating source to coalesce multiple events from the same thread.
+  // all the sessions send to the same source.
   val source = createSource(new ListEventAggregator[Delivery](), queue)
   source.setEventHandler(^{drain_source});
   source.resume
@@ -520,92 +329,107 @@ class DeliverySessionManager(val sink:De
   def drain_source = {
     val deliveries = source.getData
     deliveries.foreach { delivery=>
-      sink.send(delivery)
+      targetSink.send(delivery)
       delivery.release
     }
   }
 
-  class SessionServer(val producer_queue:DispatchQueue) {
-    private var _capacity = 0
-
-    def capacity(value:Int) = {
-      val change = value - _capacity;
-      _capacity = value;
-      client.credit(change)
-    }
+  /**
+   * tracks one producer to consumer session / credit window.
+   */
+  class Session(val producer_queue:DispatchQueue) extends DeliveryOverflowBuffer(targetSink)  {
 
-    def drain(callback:Runnable) = {
-      client.drain(callback)
+    // retain since the producer will be using this source to send messages
+    // to the consumer
+    source.retain
+
+    ///////////////////////////////////////////////////
+    // These members are used from the context of the
+    // producer serial dispatch queue
+    ///////////////////////////////////////////////////
+
+    // create a source to coalesce credit events back to the producer side...
+    val credit_adder = createSource(EventAggregators.INTEGER_ADD , producer_queue)
+    credit_adder.setEventHandler(^{
+      internal_credit(credit_adder.getData.intValue)
+    });
+    credit_adder.resume
+
+    private var credits = 0;
+
+    private var closed = false
+
+    def close = {
+      credit_adder.release
+      source.release
+      closed=true
     }
 
-    val client = new SessionClient()
+    override def full = credits <= 0
 
-    class SessionClient() extends DeliveryOverflowBuffer(sink) {
-
-      val credit_adder = createSource(EventAggregators.INTEGER_ADD , producer_queue)
-      credit_adder.setEventHandler(^{
-        internal_credit(credit_adder.getData.intValue)
-      });
-      credit_adder.resume
-      source.retain
-      var closed = false
-
-      private var credits = 0;
-
-      ///////////////////////////////////////////////////
-      // These methods get called from the client/producer thread...
-      ///////////////////////////////////////////////////
-      def close = {
-        credit_adder.release
-        source.release
-        closed=true
+    override def send(delivery: Delivery) = {
+      // retain the storage ref.. the target sink should
+      // release once it no longer needs it.
+      if( delivery.ref !=null ) {
+        delivery.ref.retain
       }
+      super.send(delivery)
+    }
 
-      override def full = credits <= 0
-
-      override protected def send_to_delivery_buffer(value:Delivery) = {
-        if( !closed ) {
-          var delivery = Delivery(value)
-          credit_adder.retain
-          delivery.setDisposer(^{
-            // This is called from the server/consumer thread
-            credit_adder.merge(delivery.size);
-            credit_adder.release
-          })
-          internal_credit(-delivery.size)
-          source.merge(delivery)
-        }
+    override protected def send_to_delivery_buffer(value:Delivery) = {
+      if( !closed ) {
+        var delivery = value.copy
+        credit_adder.retain
+        delivery.setDisposer(^{
+          // once the delivery is received by the consumer, event
+          // the producer the credit.
+          credit_adder.merge(delivery.size);
+          credit_adder.release
+        })
+        internal_credit(-delivery.size)
+        // use the source to send the consumer a delivery event.
+        source.merge(delivery)
       }
+    }
 
-      def internal_credit(value:Int) = {
-        credits += value;
-        if( closed || credits <= 0 ) {
-          credits = 0
-        } else {
-          drainOverflow
-        }
+    def internal_credit(value:Int) = {
+      credits += value;
+      if( closed || credits <= 0 ) {
+        credits = 0
+      } else {
+        drainOverflow
       }
+    }
 
-      ///////////////////////////////////////////////////
-      // These methods get called from the server/consumer thread...
-      ///////////////////////////////////////////////////
-      def credit(value:Int) = ^{ internal_credit(value) } >>: producer_queue
+    ///////////////////////////////////////////////////
+    // These members are used from the context of the
+    // consumer serial dispatch queue
+    ///////////////////////////////////////////////////
 
-      def drain(callback:Runnable) = {
-        credits = 0
-        if( callback!=null ) {
-          queue << callback
-        }
-      }
+    private var _capacity = 0
+
+    def credit(value:Int) = ^{
+      internal_credit(value)
+    } >>: producer_queue
+
+    def capacity(value:Int) = {
+      val change = value - _capacity;
+      _capacity = value;
+      credit(change)
     }
+
   }
 
-  def session(producer_queue:DispatchQueue) = {
-    val session = new SessionServer(producer_queue)
+  def open(producer_queue:DispatchQueue):DeliverySink = {
+    val session = createSession(producer_queue)
     sessions = session :: sessions
     session.capacity(session_max_credits)
-    session.client
+    session
   }
 
+  def close(session:DeliverySink) = {
+    session.asInstanceOf[DeliverySessionManager#Session].close
+  }
 
+  protected def createSession(producer_queue:DispatchQueue) = new Session(producer_queue)
 }

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Destinations.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Destinations.scala?rev=961111&r1=961110&r2=961111&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Destinations.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Destinations.scala Wed Jul  7 03:58:43 2010
@@ -23,11 +23,12 @@ import BufferConversions._
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
 class ParserOptions {
-  var defaultDomain:AsciiBuffer = null
-  var queuePrefix:AsciiBuffer = null
-  var topicPrefix:AsciiBuffer = null
-  var tempQueuePrefix:AsciiBuffer = null
-  var tempTopicPrefix:AsciiBuffer = null
+  var defaultDomain: AsciiBuffer = null
+  var queuePrefix: AsciiBuffer = null
+  var topicPrefix: AsciiBuffer = null
+  var tempQueuePrefix: AsciiBuffer = null
+  var tempTopicPrefix: AsciiBuffer = null
+  var separator: Option[Byte] = None
 }
 
 /**
@@ -35,83 +36,107 @@ class ParserOptions {
  */
 object DestinationParser {
 
-    /**
-     * Parses a simple destination.
-     *
-     * @param value
-     * @param options
-     * @return
-     */
-    def parse(value:AsciiBuffer, options:ParserOptions ):Destination = {
-        if (options.queuePrefix!=null && value.startsWith(options.queuePrefix)) {
-            var name = value.slice(options.queuePrefix.length, value.length).ascii();
-            return new SingleDestination(Domain.QUEUE_DOMAIN, name);
-        } else if (options.topicPrefix!=null && value.startsWith(options.topicPrefix)) {
-            var name = value.slice(options.topicPrefix.length, value.length).ascii();
-            return new SingleDestination(Domain.TOPIC_DOMAIN, name);
-        } else if (options.tempQueuePrefix!=null && value.startsWith(options.tempQueuePrefix)) {
-            var name = value.slice(options.tempQueuePrefix.length, value.length).ascii();
-            return new SingleDestination(Domain.TEMP_QUEUE_DOMAIN, name);
-        } else if (options.tempTopicPrefix!=null && value.startsWith(options.tempTopicPrefix)) {
-            var name = value.slice(options.tempTopicPrefix.length, value.length).ascii();
-            return new SingleDestination(Domain.TEMP_TOPIC_DOMAIN, name);
-        } else {
-            if( options.defaultDomain==null ) {
-                return null;
+  def toBuffer(value: Destination, options: ParserOptions): AsciiBuffer = {
+    if (value == null) {
+      null
+    } else {
+      val baos = new ByteArrayOutputStream
+      def write(value: Destination):Unit = {
+        if (value.getDestinations != null) {
+          assert( options.separator.isDefined )
+          val first = true
+          for (d <- value.getDestinations) {
+            if (!first) {
+              baos.write(options.separator.get)
             }
-            return new SingleDestination(options.defaultDomain, value);
+            write(d)
+          }
+        } else {
+          value.getDomain match {
+            case Domain.QUEUE_DOMAIN =>
+              baos.write(options.queuePrefix)
+            case Domain.TOPIC_DOMAIN =>
+              baos.write(options.topicPrefix)
+            case Domain.TEMP_QUEUE_DOMAIN =>
+              baos.write(options.tempQueuePrefix)
+            case Domain.TEMP_TOPIC_DOMAIN =>
+              baos.write(options.tempTopicPrefix)
+          }
+          baos.write(value.getName)
         }
+      }
+      write(value)
+      baos.toBuffer.ascii
     }
+  }
 
-    /**
-     * Parses a destination which may or may not be a composite.
-     *
-     * @param value
-     * @param options
-     * @param compositeSeparator
-     * @return
-     */
-    def parse(value:AsciiBuffer, options:ParserOptions , compositeSeparator:Byte ):Destination = {
-        if( value == null ) {
-            return null;
-        }
+  /**
+   * Parses a destination which may or may not be a composite.
+   *
+   * @param value
+   * @param options
+   * @param compositeSeparator
+   * @return
+   */
+  def parse(value: AsciiBuffer, options: ParserOptions): Destination = {
+    if (value == null) {
+      return null;
+    }
 
-        if( value.contains(compositeSeparator) ) {
-            var rc = value.split(compositeSeparator);
-            var dl:List[Destination] = Nil
-            for (buffer <- rc) {
-              val d = parse(buffer, options)
-              if( d==null ) {
-                return null;
-              }
-              dl = dl ::: d :: Nil
-            }
-            return new MultiDestination(dl.toArray[Destination]);
+    if (options.separator.isDefined && value.contains(options.separator.get)) {
+      var rc = value.split(options.separator.get);
+      var dl: List[Destination] = Nil
+      for (buffer <- rc) {
+        val d = parse(buffer, options)
+        if (d == null) {
+          return null;
+        }
+        dl = dl ::: d :: Nil
+      }
+      return new MultiDestination(dl.toArray[Destination]);
+    } else {
+      if (options.queuePrefix != null && value.startsWith(options.queuePrefix)) {
+        var name = value.slice(options.queuePrefix.length, value.length).ascii();
+        return new SingleDestination(Domain.QUEUE_DOMAIN, name);
+      } else if (options.topicPrefix != null && value.startsWith(options.topicPrefix)) {
+        var name = value.slice(options.topicPrefix.length, value.length).ascii();
+        return new SingleDestination(Domain.TOPIC_DOMAIN, name);
+      } else if (options.tempQueuePrefix != null && value.startsWith(options.tempQueuePrefix)) {
+        var name = value.slice(options.tempQueuePrefix.length, value.length).ascii();
+        return new SingleDestination(Domain.TEMP_QUEUE_DOMAIN, name);
+      } else if (options.tempTopicPrefix != null && value.startsWith(options.tempTopicPrefix)) {
+        var name = value.slice(options.tempTopicPrefix.length, value.length).ascii();
+        return new SingleDestination(Domain.TEMP_TOPIC_DOMAIN, name);
+      } else {
+        if (options.defaultDomain == null) {
+          return null;
         }
-        return parse(value, options);
+        return new SingleDestination(options.defaultDomain, value);
+      }
     }
+  }
 }
 
 /**
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-case class SingleDestination(var domain:AsciiBuffer=null, var name:AsciiBuffer=null) extends Destination {
+case class SingleDestination(var domain: AsciiBuffer = null, var name: AsciiBuffer = null) extends Destination {
+  def getDestinations(): Array[Destination] = null;
+  def getDomain(): AsciiBuffer = domain
 
-  def getDestinations():Array[Destination] = null;
-  def getDomain():AsciiBuffer = domain
-  def getName():AsciiBuffer = name
+  def getName(): AsciiBuffer = name
 
-  override def toString() = ""+domain+":"+name
+  override def toString() = "" + domain + ":" + name
 }
 
 /**
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-case class MultiDestination(var destinations:Array[Destination]) extends Destination {
+case class MultiDestination(var destinations: Array[Destination]) extends Destination {
+  def getDestinations(): Array[Destination] = destinations;
+  def getDomain(): AsciiBuffer = null
 
-  def getDestinations():Array[Destination] = destinations;
-  def getDomain():AsciiBuffer = null
-  def getName():AsciiBuffer = null
+  def getName(): AsciiBuffer = null
 
   override def toString() = destinations.mkString(",")
 }

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala?rev=961111&r1=961110&r2=961111&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala Wed Jul  7 03:58:43 2010
@@ -78,10 +78,10 @@ object Router extends Log {
  *
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-class Router(var queue:DispatchQueue) extends DispatchLogging {
+class Router(val host:VirtualHost) extends DispatchLogging {
 
   override protected def log = Router
-  protected def dispatchQueue:DispatchQueue = queue
+  protected def dispatchQueue:DispatchQueue = host.dispatchQueue
 
   trait DestinationNode {
     var targets = List[DeliveryConsumer]()
@@ -120,22 +120,39 @@ class Router(var queue:DispatchQueue) ex
   }
 
   class QueueDestinationNode(destination:Destination) extends DestinationNode {
-    val queue = new Queue(destination)
+    var queue:Queue = null
+
+    // once the queue is created.. connect it up with the producers and targets.
+    host.getQueue(destination) { q =>
+      dispatchQueue {
+        queue = q;
+        queue.bind(targets)
+        routes.foreach({route=>
+          route.connected(queue :: Nil)
+        })
+      }
+    }
 
     def on_bind(x:List[DeliveryConsumer]) =  {
       targets = x ::: targets
-      queue.bind(x)
+      if( queue!=null ) {
+        queue.bind(x)
+      }
     }
 
     def on_unbind(x:List[DeliveryConsumer]):Boolean = {
       targets = targets.filterNot({t=>x.contains(t)})
-      queue.unbind(x)
+      if( queue!=null ) {
+        queue.unbind(x)
+      }
       routes == Nil && targets == Nil
     }
 
     def on_connect(route:DeliveryProducerRoute) = {
       routes = route :: routes
-      route.connected(queue :: Nil)
+      if( queue!=null ) {
+        route.connected(queue :: Nil)
+      }
     }
   }
 
@@ -156,13 +173,13 @@ class Router(var queue:DispatchQueue) ex
 
   def bind(destination:Destination, targets:List[DeliveryConsumer]) = retaining(targets) {
       get(destination).on_bind(targets)
-    } >>: queue
+    } >>: dispatchQueue
 
   def unbind(destination:Destination, targets:List[DeliveryConsumer]) = releasing(targets) {
       if( get(destination).on_unbind(targets) ) {
         destinations.remove(destination)
       }
-    } >>: queue
+    } >>: dispatchQueue
 
   def connect(destination:Destination, routeQueue:DispatchQueue, producer:DeliveryProducer)(completed: (DeliveryProducerRoute)=>Unit) = {
     val route = new DeliveryProducerRoute(destination, routeQueue, producer) {
@@ -172,7 +189,7 @@ class Router(var queue:DispatchQueue) ex
     }
     ^ {
       get(destination).on_connect(route)
-    } >>: queue
+    } >>: dispatchQueue
   }
 
   def isTopic(destination:Destination) = destination.getDomain == TOPIC_DOMAIN
@@ -180,7 +197,7 @@ class Router(var queue:DispatchQueue) ex
 
   def disconnect(route:DeliveryProducerRoute) = releasing(route) {
       get(route.destination).on_disconnect(route)
-    } >>: queue
+    } >>: dispatchQueue
 
 
    def each(proc:(Destination, DestinationNode)=>Unit) = {
@@ -235,7 +252,7 @@ class DeliveryProducerRoute(val destinat
   private def internal_bind(values:List[DeliveryConsumer]) = {
     values.foreach{ x=>
       debug("producer route attaching to conusmer.")
-      targets = x.open_session(dispatchQueue) :: targets
+      targets = x.connect(dispatchQueue) :: targets
     }
   }
 

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala?rev=961111&r1=961110&r2=961111&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala Wed Jul  7 03:58:43 2010
@@ -36,6 +36,12 @@ import ReporterLevel._
  */
 object VirtualHost extends Log {
 
+  val destination_parser_options = new ParserOptions
+  destination_parser_options.queuePrefix = new AsciiBuffer("queue:")
+  destination_parser_options.topicPrefix = new AsciiBuffer("topic:")
+  destination_parser_options.tempQueuePrefix = new AsciiBuffer("temp-queue:")
+  destination_parser_options.tempTopicPrefix = new AsciiBuffer("temp-topic:")
+
   /**
    * Creates a default a configuration object.
    */
@@ -67,20 +73,20 @@ class VirtualHost(val broker: Broker) ex
   import VirtualHost._
   
   override protected def log = VirtualHost
-  override protected val dispatchQueue:DispatchQueue = ScalaDispatch.createQueue("virtual-host");
+  override val dispatchQueue:DispatchQueue = ScalaDispatch.createQueue("virtual-host");
 
   var config:VirtualHostDTO = _
   private val queueStore = new BrokerQueueStore()
   private val queues = new HashMap[AsciiBuffer, Queue]()
   private val durableSubs = new HashMap[String, DurableSubscription]()
-  val router = new Router(dispatchQueue)
+  val router = new Router(this)
 
   var names:List[String] = Nil;
   def setNamesArray( names:ArrayList[String]) = {
     this.names = names.toList
   }
 
-  var database:BrokerDatabase = new BrokerDatabase
+  var database:BrokerDatabase = new BrokerDatabase(this)
   var transactionManager:TransactionManager = new TransactionManager
 
   override def toString = if (config==null) "virtual-host" else "virtual-host: "+config.id
@@ -103,29 +109,32 @@ class VirtualHost(val broker: Broker) ex
 
   override protected def _start(onCompleted:Runnable):Unit = {
 
-    database.virtualHost = this
     database.start();
 
-//      router.setDatabase(database);
+    database.listQueues { ids =>
+      for( id <- ids) {
+        database.getQueueInfo(id) { x =>
+          x match {
+            case Some(info)=>
+            dispatchQueue ^{
+              val dest = DestinationParser.parse(info.record.name, destination_parser_options)
+              if( dest.getDomain == Domain.QUEUE_DOMAIN ) {
+
+                val queue = new Queue(dest, id)
+                queue.first_seq = info.first
+                queue.last_seq = info.last
+                queue.message_seq_counter = info.last+1
+                queue.count = info.count
+
+                queues.put(info.record.name, queue)
+              }
+            }
+            case _ =>
+          }
+        }
+      }
+    }
 
-    //Recover queues:
-    queueStore.setDatabase(database);
-    queueStore.setDispatchQueue(dispatchQueue);
-    queueStore.loadQueues();
-
-    // Create Queue instances
-//        TODO:
-//        for (IQueue<Long, MessageDelivery> iQueue : queueStore.getSharedQueues()) {
-//            Queue queue = new Queue(iQueue);
-//            Domain domain = router.getDomain(Router.QUEUE_DOMAIN);
-//            Destination dest = new Destination.SingleDestination(Router.QUEUE_DOMAIN, iQueue.getDescriptor().getQueueName());
-//            queue.setDestination(dest);
-//            domain.bind(dest.getName(), queue);
-//            queues.put(dest.getName(), queue);
-//        }
-//        for (Queue queue : queues.values()) {
-//            queue.start();
-//        }
 
     //Recover transactions:
     transactionManager.virtualHost = this
@@ -154,31 +163,38 @@ class VirtualHost(val broker: Broker) ex
     onCompleted.run
   }
 
-  def createQueue(dest:Destination) :Queue = {
-//      if (!serviceState.isStarted) {
-//          //Queues from the store must be loaded before we can create new ones:
-//          throw new IllegalStateException("Can't create queue on unstarted host");
-//      }
+  def getQueue(destination:Destination)(cb: (Queue)=>Unit ) = ^{
+    if( !serviceState.isStarted ) {
+      error("getQueue can only be called while the service is running.")
+      cb(null)
+    } else {
+      var queue = queues.get(destination);
+      if( queue==null && config.autoCreateQueues ) {
+        addQueue(destination)(cb)
+      } else  {
+        cb(queue)
+      }
+    }
+  } |>>: dispatchQueue
 
-      val queue = queues.get(dest);
-//        TODO:
-//        // If the queue doesn't exist create it:
-//        if (queue == null) {
-//            IQueue<Long, MessageDelivery> iQueue = queueStore.createSharedQueue(dest.getName().toString());
-//            queue = new Queue(iQueue);
-//            queue.setDestination(dest);
-//            Domain domain = router.getDomain(dest.getDomain());
-//            domain.bind(dest.getName(), queue);
-//            queues.put(dest.getName(), queue);
-//
-//            for (QueueLifecyleListener l : queueLifecyleListeners) {
-//                l.onCreate(queue);
-//            }
-//        }
-//        queue.start();
-      queue;
-  }
 
+  def addQueue(dest:Destination)(cb: (Queue)=>Unit ) = ^{
+    val name = DestinationParser.toBuffer(dest, destination_parser_options)
+    val record = QueueRecord(0, name, null, null)
+    database.addQueue(record) { rc =>
+      rc match {
+        case Some(id) =>
+          dispatchQueue ^ {
+            val queue = new Queue(dest, id)
+            queues.put(name, queue)
+            cb(queue)
+          }
+        case None => // store could not create
+          cb(null)
+      }
+    }
+    null
+  } |>>: dispatchQueue
 
   def createSubscription(consumer:ConsumerContext):BrokerSubscription = {
       createSubscription(consumer, consumer.getDestination());
@@ -227,7 +243,8 @@ class VirtualHost(val broker: Broker) ex
       var queue = queues.get(destination.getName());
       if (queue == null) {
           if (consumer.autoCreateDestination()) {
-              queue = createQueue(destination);
+//            TODO
+//              queue = createQueue(destination);
           } else {
               throw new IllegalStateException("The queue does not exist: " + destination.getName());
           }
@@ -249,25 +266,25 @@ class VirtualHost(val broker: Broker) ex
   }
 }
 
-/**
- * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
- */
-class BrokerDatabase() {
-
-  @BeanProperty
-  var store:Store=new MemoryStore;
-
-  @BeanProperty
-  var virtualHost:VirtualHost=null;
-
-    def start() ={
-        //To change body of implemented methods use File | Settings | File Templates.
-    }
-
-    def stop() = {
-        //To change body of implemented methods use File | Settings | File Templates.
-    }
-
+///**
+// * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+// */
+//class BrokerDatabase() {
+//
+//  @BeanProperty
+//  var store:Store=new MemoryStore;
+//
+//  @BeanProperty
+//  var virtualHost:VirtualHost=null;
+//
+//    def start() ={
+//        //To change body of implemented methods use File | Settings | File Templates.
+//    }
+//
+//    def stop() = {
+//        //To change body of implemented methods use File | Settings | File Templates.
+//    }
+//
 // TODO: re-implement.
 //    private static final boolean DEBUG = false;
 //
@@ -1587,8 +1604,8 @@ class BrokerDatabase() {
 //    public void setStoreBypass(boolean enable) {
 //        this.storeBypass = enable;
 //    }
-
-}
+//
+//}
 
 
 

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BaseBrokerPerfSupport.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BaseBrokerPerfSupport.scala?rev=961111&r1=961110&r2=961111&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BaseBrokerPerfSupport.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BaseBrokerPerfSupport.scala Wed Jul  7 03:58:43 2010
@@ -396,12 +396,12 @@ abstract class BaseBrokerPerfSupport ext
         val name = new AsciiBuffer("dest" + (i + 1))
         var bean = new SingleDestination(domain, name)
         dests(i) = bean;
-        if (PTP) {
-          sendBroker.defaultVirtualHost.createQueue(dests(i));
-          if (MULTI_BROKER) {
-            rcvBroker.defaultVirtualHost.createQueue(dests(i));
-          }
-        }
+//        if (PTP) {
+//          sendBroker.defaultVirtualHost.createQueue(dests(i));
+//          if (MULTI_BROKER) {
+//            rcvBroker.defaultVirtualHost.createQueue(dests(i));
+//          }
+//        }
       }
 
       for (i <- 0 until producerCount) {

Modified: activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/VirtualHostDTO.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/VirtualHostDTO.java?rev=961111&r1=961110&r2=961111&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/VirtualHostDTO.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/VirtualHostDTO.java Wed Jul  7 03:58:43 2010
@@ -32,5 +32,13 @@ public class VirtualHostDTO extends Serv
 
     @XmlElementRef   
     public StoreDTO store;
-    
+
+    /**
+     * Should queues be auto created when they are first accessed
+     * by clients?
+     */
+    @XmlAttribute(name="auto-create-queues")
+    public boolean autoCreateQueues = true;
+
+
 }

Modified: activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala?rev=961111&r1=961110&r2=961111&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala Wed Jul  7 03:58:43 2010
@@ -100,12 +100,14 @@ case class StompFrameMessage(frame:Stomp
     header match {
       case (Stomp.Headers.Message.MESSAGE_ID, value) =>
         id = value
-      case (Stomp.Headers.Message.PRORITY, value) =>
+      case (Stomp.Headers.Send.PRIORITY, value) =>
         priority = java.lang.Integer.parseInt(value).toByte
-      case (Stomp.Headers.Message.DESTINATION, value) =>
+      case (Stomp.Headers.Send.DESTINATION, value) =>
         destination = value
-      case (Stomp.Headers.Message.EXPIRATION_TIME, value) =>
+      case (Stomp.Headers.Send.EXPIRATION_TIME, value) =>
         expiration = java.lang.Long.parseLong(value)
+      case (Stomp.Headers.Send.PERSISTENT, value) =>
+        persistent = java.lang.Boolean.parseBoolean(value)
       case _ =>
     }
   }

Modified: activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala?rev=961111&r1=961110&r2=961111&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala Wed Jul  7 03:58:43 2010
@@ -32,6 +32,8 @@ import java.io.IOException
 
 object StompConstants {
 
+  val PROTOCOL = new AsciiBuffer("stomp");
+
   val options = new ParserOptions
   options.queuePrefix = new AsciiBuffer("/queue/")
   options.topicPrefix = new AsciiBuffer("/topic/")
@@ -70,8 +72,8 @@ class StompProtocolHandler extends Proto
 
     def matches(message:Delivery) = true
 
-    def open_session(producer_queue:DispatchQueue) = new DeliverySession {
-      val session = session_manager.session(producer_queue)
+    def connect(producer_queue:DispatchQueue) = new DeliverySession {
+      val session = session_manager.open(producer_queue)
 
       val consumer = SimpleConsumer.this
       retain
@@ -82,7 +84,7 @@ class StompProtocolHandler extends Proto
       }
 
       def close = {
-        session.close
+        session_manager.close(session)
         release
       }
     }
@@ -240,7 +242,15 @@ class StompProtocolHandler extends Proto
         StompFrameMessage(StompFrame(Stomp.Responses.MESSAGE, frame.headers, frame.content))
       }
       
-      val delivery = Delivery(message, message.frame.size)
+      val delivery = new Delivery
+      delivery.message = message
+      delivery.size = message.frame.size
+      if( message.persistent ) {
+        // TODO:
+//        val content = ascii("todo")
+//        delivery.ref = host.database.createMessageRecord(message.id, content, PROTOCOL)
+      }
+
       connection.transport.suspendRead
       delivery.setDisposer(^{
         connection.transport.resumeRead