You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2010/07/07 05:58:43 UTC
svn commit: r961111 - in /activemq/sandbox/activemq-apollo-actor:
activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/
activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/
activemq-dto/src/main/java/org/apache/activemq/ap...
Author: chirino
Date: Wed Jul 7 03:58:43 2010
New Revision: 961111
URL: http://svn.apache.org/viewvc?rev=961111&view=rev
Log:
starting to flesh out the persistence side of things
Added:
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/BrokerDatabase.scala
Modified:
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Destinations.scala
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BaseBrokerPerfSupport.scala
activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/VirtualHostDTO.java
activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala
activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala
Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala?rev=961111&r1=961110&r2=961111&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala Wed Jul 7 03:58:43 2010
@@ -269,7 +269,7 @@ object Queue extends Log {
*
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
-class Queue(val destination:Destination) extends BaseRetained with Route with DeliveryConsumer with DeliveryProducer with DispatchLogging {
+class Queue(val destination:Destination, val storeId:Long) extends BaseRetained with Route with DeliveryConsumer with DeliveryProducer with DispatchLogging {
override protected def log = Queue
override val dispatchQueue:DispatchQueue = createQueue("queue:"+destination);
@@ -278,7 +278,33 @@ class Queue(val destination:Destination)
debug("created queue for: "+destination)
}
- val delivery_buffer = new DeliveryBuffer
+ // sequence numbers.. used to track what's in the store.
+ var first_seq = -1L
+ var last_seq = -1L
+ var message_seq_counter=0L
+ def next_message_seq = {
+ val rc = message_seq_counter
+ message_seq_counter += 1
+ rc
+ }
+
+ var count = 0
+
+ val delivery_buffer = new DeliveryBuffer {
+
+ override def send(delivery: Delivery) = {
+ // Is it a persistent message?
+ if( delivery.ref!=null ) {
+ // next_message_seq
+
+ }
+ super.send(delivery)
+ }
+
+ override def ack(delivery: Delivery) = {
+ super.ack(delivery)
+ }
+ }
delivery_buffer.eventHandler = ^{ drain_delivery_buffer }
val session_manager = new DeliverySessionManager(delivery_buffer, dispatchQueue)
@@ -292,7 +318,7 @@ class Queue(val destination:Destination)
var bound=true
def deliver(value:Delivery):Unit = {
- val delivery = Delivery(value)
+ val delivery = value.copy
delivery.setDisposer(^{
^{ completed(value) } >>:dispatchQueue
})
@@ -315,7 +341,7 @@ class Queue(val destination:Destination)
def connected(consumers:List[DeliveryConsumer]) = bind(consumers)
def bind(consumers:List[DeliveryConsumer]) = retaining(consumers) {
for ( consumer <- consumers ) {
- val cs = new ConsumerState(consumer.open_session(dispatchQueue))
+ val cs = new ConsumerState(consumer.connect(dispatchQueue))
allConsumers += consumer->cs
readyConsumers.addLast(cs)
}
@@ -353,16 +379,16 @@ class Queue(val destination:Destination)
}
}
- def open_session(producer_queue:DispatchQueue) = new DeliverySession {
+ def connect(producer_queue:DispatchQueue) = new DeliverySession {
- val session = session_manager.session(producer_queue)
+ val session = session_manager.open(producer_queue)
val consumer = Queue.this
retain
def deliver(delivery:Delivery) = session.send(delivery)
def close = {
- session.close
+ session_manager.close(session)
release
}
}
Added: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/BrokerDatabase.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/BrokerDatabase.scala?rev=961111&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/BrokerDatabase.scala (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/BrokerDatabase.scala Wed Jul 7 03:58:43 2010
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.broker
+
+import _root_.java.lang.{String}
+import _root_.org.fusesource.hawtdispatch.ScalaDispatch._
+import org.fusesource.hawtbuf._
+import org.apache.activemq.util.TreeMap
+import java.util.concurrent.atomic.{AtomicLong}
+import java.util.{HashSet}
+import collection.JavaConversions
+
+class Record
+case class QueueRecord(val id:Long, val name:AsciiBuffer, val parent:AsciiBuffer, val config:String) extends Record
+class MessageRecord(val id:Long, val msgId:AsciiBuffer, val encoding: AsciiBuffer, val message:Buffer) extends Record
+class QueueEntryRecord(val queue:Long, val seqId:Long, val msgId:Long) extends Record
+class SubscriptionRecord(val id:Long, val pk:AsciiBuffer, val selector:AsciiBuffer, val destination:AsciiBuffer, val durable:Boolean, val tte:Long, val attachment:Buffer) extends Record
+class Action
+case class CreateRecord(record:Record) extends Action
+case class UpdateRecord(record:Record) extends Action
+case class DeleteRecord(id:Long) extends Action
+
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+class BrokerDatabase(host:VirtualHost) {
+
+ def start() ={
+ }
+
+ def stop() = {
+ }
+
+ private object messages {
+
+ val dispatchQueue = createQueue("MessagesTable")
+ val messages = new TreeMap[Long, MessageRecord]
+ val inprogress = new HashSet[Long]()
+
+ def add(record:MessageRecord) = {
+ val id= record.id
+
+ // the inprogress list protects the message from being
+ // gced too soon. Protection ends once StoredMessageRef
+ // is disposed..
+ val ref = new StoredMessageRef(id) {
+ override def dispose = ^{
+ inprogress.remove(id)
+ } >>: dispatchQueue
+ }
+
+ using(ref) {
+ inprogress.add(id)
+ messages.put(record.id, record)
+ } >>: dispatchQueue
+
+ ref
+ }
+
+ def get(id:Long, cb:(MessageRecord)=>Unit) = reply(cb) {
+ messages.get(id)
+ } >>: dispatchQueue
+
+ }
+
+ private val msg_id_generator = new AtomicLong
+ def createMessageRecord(msgId:AsciiBuffer, encoding:AsciiBuffer, message:Buffer) = {
+ val record = new MessageRecord(msg_id_generator.incrementAndGet, msgId, encoding, message)
+ messages.add(record)
+ }
+
+
+
+ case class QueueData(val record:QueueRecord) {
+ var messges:List[Long] = Nil
+ }
+
+ private object queues {
+ var _next_id = 0L;
+ def next_id = {
+ val rc = _next_id
+ _next_id += 1
+ rc
+ }
+
+ val dispatchQueue = createQueue("QueuesTable")
+ val records = new TreeMap[Long, QueueData]
+ }
+
+ case class QueueInfo(record:QueueRecord, first:Long, last:Long, count:Int)
+
+ def listQueues(cb: (Seq[Long])=>Unit ) = reply(cb) {
+ JavaConversions.asSet(queues.records.keySet).toSeq
+ } >>: queues.dispatchQueue
+
+ def getQueueInfo(id:Long)(cb:(Option[QueueInfo])=>Unit ) = reply(cb) {
+ val qd = queues.records.get(id)
+ if( qd == null ) {
+ None
+ } else {
+ Some(
+ if( qd.messges.isEmpty ) {
+ QueueInfo(qd.record, -1, -1, 0)
+ } else {
+ QueueInfo(qd.record, qd.messges.head, qd.messges.last, qd.messges.size)
+ }
+ )
+ }
+ } >>: queues.dispatchQueue
+
+
+ def addQueue(record:QueueRecord)(cb:(Option[Long])=>Unit):Unit = reply(cb) {
+ val id = queues.next_id
+ if( queues.records.containsKey(id) ) {
+ None
+ } else {
+ queues.records.put(id, QueueData(record))
+ Some(id)
+ }
+ } >>: queues.dispatchQueue
+
+}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala?rev=961111&r1=961110&r2=961111&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala Wed Jul 7 03:58:43 2010
@@ -16,15 +16,19 @@
*/
package org.apache.activemq.apollo.broker
-import _root_.java.util.{LinkedList}
import _root_.org.apache.activemq.filter.{MessageEvaluationContext}
import _root_.java.lang.{String}
import _root_.org.fusesource.hawtdispatch._
import _root_.org.fusesource.hawtdispatch.ScalaDispatch._
import org.apache.activemq.transport.Transport
import org.fusesource.hawtbuf._
+import org.apache.activemq.util.TreeMap
+import java.util.concurrent.atomic.{AtomicLong, AtomicInteger}
+import java.util.{HashSet, LinkedList}
/**
+ * A producer which sends Delivery objects to a delivery consumer.
+ *
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
trait DeliveryProducer {
@@ -32,23 +36,30 @@ trait DeliveryProducer {
}
/**
+ * The delivery consumer accepts messages from a delivery producer.
+ *
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
-trait DeliverySession {
- val consumer:DeliveryConsumer
- def deliver(delivery:Delivery)
- def close:Unit
+trait DeliveryConsumer extends Retained {
+ def matches(message:Delivery)
+ val dispatchQueue:DispatchQueue;
+ def connect(producer:DispatchQueue):DeliverySession
}
/**
+ * Before a derlivery producer can send Delivery objects to a delivery
+ * consumer, it creates a Delivery session which it uses to send
+ * the deliveries over.
+ *
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
-trait DeliveryConsumer extends Retained {
- def matches(message:Delivery)
- val dispatchQueue:DispatchQueue;
- def open_session(producer_queue:DispatchQueue):DeliverySession
+trait DeliverySession {
+ val consumer:DeliveryConsumer
+ def deliver(delivery:Delivery)
+ def close:Unit
}
+
/**
* Abstracts wire protocol message implementations. Each wire protocol
* will provide it's own type of Message.
@@ -95,311 +106,68 @@ trait Message {
}
+case class StoredMessageRef(id:Long) extends BaseRetained
+
/**
+ * <p>
+ * A new Delivery object is created every time a message is transfered between a producer and
+ * it's consumer or consumers. Consumers will retain the object to flow control the producer.
+ * </p>
+ * <p>
+ * Once this object is disposed, the producer is free to send more deliveries to the consumers.
+ * </p>
+ *
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
object Delivery {
- def apply(o:Delivery) = new Delivery(o.message, o.size, o.encoded, o.encoding, o.ack, o.tx_id, o.store_id)
+// def apply(o:Delivery) = {
+// rc new Delivery();
+// o.message, o.size, o.encoded, o.encoding, o.ack, o.tx_id, o.store_id)
+// }
}
-case class Delivery (
- /**
- * the message being delivered
- */
- message: Message,
+class Delivery extends BaseRetained {
/**
* memory size of the delivery. Used for resource allocation tracking
*/
- size:Int,
+ var size:Int = 0
/**
- * the encoded form of the message being delivered.
+ * the encoding format of the message
*/
- encoded: Buffer = null,
+ var encoding: String = null
/**
- * the encoding format of the message
+ * the message being delivered
*/
- encoding: String = null,
+ var message: Message = null
/**
- * true if this delivery requires acknowledgment.
+ * the encoded form of the message being delivered.
*/
- ack:Boolean = false,
+ var encoded: Buffer = null
- /**
- * The id used to identify the transaction that the message
- * belongs to.
- */
- tx_id:Long = -1,
-
- /**
- * The id used to identify this message in the message
- * store.
- *
- * @return The store tracking or -1 if not set.
- */
- store_id: Long = -1
-
-) extends BaseRetained {
-
-}
-
-//abstract class BrokerMessageDelivery extends MessageDelivery {
-// TODO:
-// // True while the message is being dispatched to the delivery targets:
-// boolean dispatching = false;
-//
-// // A non null pending save indicates that the message is the
-// // saver queue and that the message
-// OperationContext<?> pendingSave;
-//
-// // List of persistent targets for which the message should be saved
-// // when dispatch is complete:
-// HashMap<QueueDescriptor, SaveableQueueElement<MessageDelivery>> persistentTargets;
-// SaveableQueueElement<MessageDelivery> singleTarget;
-//
-// long storeTracking = -1;
-// BrokerDatabase store;
-// boolean fromStore = false;
-// boolean enableFlushDelay = true;
-// private int limiterSize = -1;
-// private long tid=-1;
-//
-// public void setFromDatabase(BrokerDatabase database, MessageRecord mRecord) {
-// fromStore = true;
-// store = database;
-// storeTracking = mRecord.getKey();
-// limiterSize = mRecord.getSize();
-// }
-//
-// public final int getFlowLimiterSize() {
-// if (limiterSize == -1) {
-// limiterSize = getMemorySize();
-// }
-// return limiterSize;
-// }
-//
-// /**
-// * When an application wishes to include a message in a broker transaction
-// * it must set this the tid returned by {@link Transaction#getTid()}
-// *
-// * @param tid
-// * Sets the tid used to identify the transaction at the broker.
-// */
-// public void setTransactionId(long tid) {
-// this.tid = tid;
-// }
-//
-// /**
-// * @return The tid used to identify the transaction at the broker.
-// */
-// public final long getTransactionId() {
-// return tid;
-// }
-//
-// public final void clearTransactionId() {
-// tid = -1;
-// }
-//
-// /**
-// * Subclass must implement this to return their current memory size
-// * estimate.
-// *
-// * @return The memory size of the message.
-// */
-// public abstract int getMemorySize();
-//
-// public final boolean isFromStore() {
-// return fromStore;
-// }
-//
-// public final void persist(SaveableQueueElement<MessageDelivery> sqe, ISourceController<?> controller, boolean delayable) {
-// synchronized (this) {
-// // Can flush of this message to the store be delayed?
-// if (enableFlushDelay && !delayable) {
-// enableFlushDelay = false;
-// }
-// // If this message is being dispatched then add the queue to the
-// // list of queues for which to save the message when dispatch is
-// // finished:
-// if (dispatching) {
-// addPersistentTarget(sqe);
-// return;
-// }
-// // Otherwise, if it is still in the saver queue, we can add this
-// // queue to the queue list:
-// else if (pendingSave != null) {
-// addPersistentTarget(sqe);
-// if (!delayable) {
-// pendingSave.requestFlush();
-// }
-// return;
-// }
-// }
-//
-// store.saveMessage(sqe, controller, delayable);
-// }
-//
-// public final void acknowledge(SaveableQueueElement<MessageDelivery> sqe) {
-// boolean firePersistListener = false;
-// boolean deleted = false;
-// synchronized (this) {
-// // If the message hasn't been saved to the database
-// // then we don't need to issue a delete:
-// if (dispatching || pendingSave != null) {
-//
-// deleted = true;
-//
-// removePersistentTarget(sqe.getQueueDescriptor());
-// // We get a save context when we place the message in the
-// // database queue. If it has been added to the queue,
-// // and we've removed the last queue, see if we can cancel
-// // the save:
-// if (pendingSave != null && !hasPersistentTargets()) {
-// if (pendingSave.cancel()) {
-// pendingSave = null;
-// if (isPersistent()) {
-// firePersistListener = true;
-// }
-// }
-// }
-// }
-// }
-//
-// if (!deleted) {
-// store.deleteQueueElement(sqe);
-// }
-//
-// if (firePersistListener) {
-// onMessagePersisted();
-// }
-//
-// }
-//
-// public final void setStoreTracking(long tracking) {
-// if (storeTracking == -1) {
-// storeTracking = tracking;
-// }
-// }
-//
-// public final void beginDispatch(BrokerDatabase database) {
-// this.store = database;
-// dispatching = true;
-// setStoreTracking(database.allocateStoreTracking());
-// }
-//
-// public long getStoreTracking() {
-// return storeTracking;
-// }
-//
-// public synchronized Collection<SaveableQueueElement<MessageDelivery>> getPersistentQueues() {
-// if (singleTarget != null) {
-// ArrayList<SaveableQueueElement<MessageDelivery>> list = new ArrayList<SaveableQueueElement<MessageDelivery>>(1);
-// list.add(singleTarget);
-// return list;
-// } else if (persistentTargets != null) {
-// return persistentTargets.values();
-// }
-// return null;
-// }
-//
-// public void beginStore() {
-// synchronized (this) {
-// pendingSave = null;
-// }
-// }
-//
-// private final boolean hasPersistentTargets() {
-// return (persistentTargets != null && !persistentTargets.isEmpty()) || singleTarget != null;
-// }
-//
-// private final void removePersistentTarget(QueueDescriptor queue) {
-// if (persistentTargets != null) {
-// persistentTargets.remove(queue);
-// return;
-// }
-//
-// if (singleTarget != null && singleTarget.getQueueDescriptor().equals(queue)) {
-// singleTarget = null;
-// }
-// }
-//
-// private final void addPersistentTarget(SaveableQueueElement<MessageDelivery> elem) {
-// if (persistentTargets != null) {
-// persistentTargets.put(elem.getQueueDescriptor(), elem);
-// return;
-// }
-//
-// if (singleTarget == null) {
-// singleTarget = elem;
-// return;
-// }
-//
-// if (elem.getQueueDescriptor() != singleTarget.getQueueDescriptor()) {
-// persistentTargets = new HashMap<QueueDescriptor, SaveableQueueElement<MessageDelivery>>();
-// persistentTargets.put(elem.getQueueDescriptor(), elem);
-// persistentTargets.put(singleTarget.getQueueDescriptor(), singleTarget);
-// singleTarget = null;
-// }
-// }
-//
-// public final void finishDispatch(ISourceController<?> controller) throws IOException {
-// boolean firePersistListener = false;
-// synchronized (this) {
-// // If any of the targets requested save then save the message
-// // Note that this could be the case even if the message isn't
-// // persistent if a target requested that the message be spooled
-// // for some other reason such as queue memory overflow.
-// if (hasPersistentTargets()) {
-// pendingSave = store.persistReceivedMessage(this, controller);
-// }
-//
-// // If none of the targets required persistence, then fire the
-// // persist listener:
-// if (pendingSave == null || !isPersistent()) {
-// firePersistListener = true;
-// }
-// dispatching = false;
-// }
-//
-// if (firePersistListener) {
-// onMessagePersisted();
-// }
-// }
-//
-// public final MessageRecord createMessageRecord() {
-//
-// MessageRecord record = new MessageRecord();
-// record.setEncoding(getStoreEncoding());
-// record.setBuffer(getStoreEncoded());
-// record.setStreamKey((long) 0);
-// record.setMessageId(getMsgId());
-// record.setSize(getFlowLimiterSize());
-// record.setKey(getStoreTracking());
-// return record;
-// }
-//
-// /**
-// * @return A buffer representation of the message to be stored in the store.
-// * @throws
-// */
-// protected abstract Buffer getStoreEncoded();
-//
-// /**
-// * @return The encoding scheme used to store the message.
-// */
-// protected abstract AsciiBuffer getStoreEncoding();
-//
-// public boolean isFlushDelayable() {
-// // TODO Auto-generated method stub
-// return enableFlushDelay;
-// }
-//}
+ var ref:StoredMessageRef = null
+
+ def copy() = (new Delivery).set(this)
+
+ def set(other:Delivery) = {
+ size = other.size
+ encoding = other.encoding
+ message = other.message
+ encoded = other.encoded
+ ref = other.ref
+ this
+ }
+
+}
/**
+ * <p>
+ * Defines the interface for objects which you can send flow controlled deliveries to.
+ * <p>
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
trait DeliverySink {
@@ -408,6 +176,11 @@ trait DeliverySink {
}
/**
+ * <p>
+ * A delivery sink which is connected to a transport. It expects the caller's dispatch
+ * queue to be the same as the transport's/
+ * <p>
+ *
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
class TransportDeliverySink(var transport:Transport) extends DeliverySink {
@@ -416,6 +189,21 @@ class TransportDeliverySink(var transpor
}
/**
+ * <p>
+ * A delivery sink which buffers deliveries sent to it up to it's
+ * maxSize settings after which it starts flow controlling the sender.
+ * <p>
+ *
+ * <p>
+ * It executes the eventHandler when it has buffered deliveries. The event handler
+ * should call receive to get the queued deliveries and then ack the delivery
+ * once it has been processed. The producer will now be resumed until
+ * the ack occurs.
+ * <p>
+ *
+ * <p>
+ * This class should only be called from a single serial dispatch queue.
+ * <p>
*
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
@@ -457,6 +245,10 @@ class DeliveryBuffer(var maxSize:Int=102
}
/**
+ * Implements a delivery sink which sends to a 'down stream' delivery sink. If the
+ * down stream delivery sink is full, this sink buffers the overflow deliveries. So that the
+ * down stream sink does not need to worry about overflow.
+ *
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
class DeliveryOverflowBuffer(val delivery_buffer:DeliverySink) extends DeliverySink {
@@ -483,7 +275,7 @@ class DeliveryOverflowBuffer(val deliver
}
protected def send_to_delivery_buffer(value:Delivery) = {
- var delivery = Delivery(value)
+ var delivery = value.copy
delivery.setDisposer(^{
drainOverflow
})
@@ -496,11 +288,27 @@ class DeliveryOverflowBuffer(val deliver
}
/**
+ * <p>
+ * A DeliverySessionManager manages multiple credit based
+ * transmission windows from multiple producers to a single consumer.
+ * </p>
+ *
+ * <p>
+ * Producers and consumers are typically executing on different threads and
+ * the overhead of doing cross thread flow control is much higher than if
+ * a credit window is used. The credit window allows the producer to
+ * send multiple messages to the consumer without needing to wait for
+ * consumer events. Only when the producer runs out of credit, does the
+ * he start overflowing the producer. This class makes heavy
+ * use of Dispatch Source objects to coalesce cross thread events
+ * like the sending messages or credits.
+ * </p>
+ *
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
-class DeliverySessionManager(val sink:DeliverySink, val queue:DispatchQueue) extends BaseRetained {
+class DeliverySessionManager(val targetSink:DeliverySink, val queue:DispatchQueue) extends BaseRetained {
- var sessions = List[SessionServer]()
+ var sessions = List[Session]()
var session_min_credits = 1024*4;
var session_credit_capacity = 1024*32
@@ -513,6 +321,7 @@ class DeliverySessionManager(val sink:De
})
// use a event aggregating source to coalesce multiple events from the same thread.
+ // all the sessions send to the same source.
val source = createSource(new ListEventAggregator[Delivery](), queue)
source.setEventHandler(^{drain_source});
source.resume
@@ -520,92 +329,107 @@ class DeliverySessionManager(val sink:De
def drain_source = {
val deliveries = source.getData
deliveries.foreach { delivery=>
- sink.send(delivery)
+ targetSink.send(delivery)
delivery.release
}
}
- class SessionServer(val producer_queue:DispatchQueue) {
- private var _capacity = 0
-
- def capacity(value:Int) = {
- val change = value - _capacity;
- _capacity = value;
- client.credit(change)
- }
+ /**
+ * tracks one producer to consumer session / credit window.
+ */
+ class Session(val producer_queue:DispatchQueue) extends DeliveryOverflowBuffer(targetSink) {
- def drain(callback:Runnable) = {
- client.drain(callback)
+ // retain since the producer will be using this source to send messages
+ // to the consumer
+ source.retain
+
+ ///////////////////////////////////////////////////
+ // These members are used from the context of the
+ // producer serial dispatch queue
+ ///////////////////////////////////////////////////
+
+ // create a source to coalesce credit events back to the producer side...
+ val credit_adder = createSource(EventAggregators.INTEGER_ADD , producer_queue)
+ credit_adder.setEventHandler(^{
+ internal_credit(credit_adder.getData.intValue)
+ });
+ credit_adder.resume
+
+ private var credits = 0;
+
+ private var closed = false
+
+ def close = {
+ credit_adder.release
+ source.release
+ closed=true
}
- val client = new SessionClient()
+ override def full = credits <= 0
- class SessionClient() extends DeliveryOverflowBuffer(sink) {
-
- val credit_adder = createSource(EventAggregators.INTEGER_ADD , producer_queue)
- credit_adder.setEventHandler(^{
- internal_credit(credit_adder.getData.intValue)
- });
- credit_adder.resume
- source.retain
- var closed = false
-
- private var credits = 0;
-
- ///////////////////////////////////////////////////
- // These methods get called from the client/producer thread...
- ///////////////////////////////////////////////////
- def close = {
- credit_adder.release
- source.release
- closed=true
+ override def send(delivery: Delivery) = {
+ // retain the storage ref.. the target sink should
+ // release once it no longer needs it.
+ if( delivery.ref !=null ) {
+ delivery.ref.retain
}
+ super.send(delivery)
+ }
- override def full = credits <= 0
-
- override protected def send_to_delivery_buffer(value:Delivery) = {
- if( !closed ) {
- var delivery = Delivery(value)
- credit_adder.retain
- delivery.setDisposer(^{
- // This is called from the server/consumer thread
- credit_adder.merge(delivery.size);
- credit_adder.release
- })
- internal_credit(-delivery.size)
- source.merge(delivery)
- }
+ override protected def send_to_delivery_buffer(value:Delivery) = {
+ if( !closed ) {
+ var delivery = value.copy
+ credit_adder.retain
+ delivery.setDisposer(^{
+ // once the delivery is received by the consumer, event
+ // the producer the credit.
+ credit_adder.merge(delivery.size);
+ credit_adder.release
+ })
+ internal_credit(-delivery.size)
+ // use the source to send the consumer a delivery event.
+ source.merge(delivery)
}
+ }
- def internal_credit(value:Int) = {
- credits += value;
- if( closed || credits <= 0 ) {
- credits = 0
- } else {
- drainOverflow
- }
+ def internal_credit(value:Int) = {
+ credits += value;
+ if( closed || credits <= 0 ) {
+ credits = 0
+ } else {
+ drainOverflow
}
+ }
- ///////////////////////////////////////////////////
- // These methods get called from the server/consumer thread...
- ///////////////////////////////////////////////////
- def credit(value:Int) = ^{ internal_credit(value) } >>: producer_queue
+ ///////////////////////////////////////////////////
+ // These members are used from the context of the
+ // consumer serial dispatch queue
+ ///////////////////////////////////////////////////
- def drain(callback:Runnable) = {
- credits = 0
- if( callback!=null ) {
- queue << callback
- }
- }
+ private var _capacity = 0
+
+ def credit(value:Int) = ^{
+ internal_credit(value)
+ } >>: producer_queue
+
+ def capacity(value:Int) = {
+ val change = value - _capacity;
+ _capacity = value;
+ credit(change)
}
+
}
- def session(producer_queue:DispatchQueue) = {
- val session = new SessionServer(producer_queue)
+ def open(producer_queue:DispatchQueue):DeliverySink = {
+ val session = createSession(producer_queue)
sessions = session :: sessions
session.capacity(session_max_credits)
- session.client
+ session
}
+ def close(session:DeliverySink) = {
+ session.asInstanceOf[DeliverySessionManager#Session].close
+ }
+ protected def createSession(producer_queue:DispatchQueue) = new Session(producer_queue)
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Destinations.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Destinations.scala?rev=961111&r1=961110&r2=961111&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Destinations.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Destinations.scala Wed Jul 7 03:58:43 2010
@@ -23,11 +23,12 @@ import BufferConversions._
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
class ParserOptions {
- var defaultDomain:AsciiBuffer = null
- var queuePrefix:AsciiBuffer = null
- var topicPrefix:AsciiBuffer = null
- var tempQueuePrefix:AsciiBuffer = null
- var tempTopicPrefix:AsciiBuffer = null
+ var defaultDomain: AsciiBuffer = null
+ var queuePrefix: AsciiBuffer = null
+ var topicPrefix: AsciiBuffer = null
+ var tempQueuePrefix: AsciiBuffer = null
+ var tempTopicPrefix: AsciiBuffer = null
+ var separator: Option[Byte] = None
}
/**
@@ -35,83 +36,107 @@ class ParserOptions {
*/
object DestinationParser {
- /**
- * Parses a simple destination.
- *
- * @param value
- * @param options
- * @return
- */
- def parse(value:AsciiBuffer, options:ParserOptions ):Destination = {
- if (options.queuePrefix!=null && value.startsWith(options.queuePrefix)) {
- var name = value.slice(options.queuePrefix.length, value.length).ascii();
- return new SingleDestination(Domain.QUEUE_DOMAIN, name);
- } else if (options.topicPrefix!=null && value.startsWith(options.topicPrefix)) {
- var name = value.slice(options.topicPrefix.length, value.length).ascii();
- return new SingleDestination(Domain.TOPIC_DOMAIN, name);
- } else if (options.tempQueuePrefix!=null && value.startsWith(options.tempQueuePrefix)) {
- var name = value.slice(options.tempQueuePrefix.length, value.length).ascii();
- return new SingleDestination(Domain.TEMP_QUEUE_DOMAIN, name);
- } else if (options.tempTopicPrefix!=null && value.startsWith(options.tempTopicPrefix)) {
- var name = value.slice(options.tempTopicPrefix.length, value.length).ascii();
- return new SingleDestination(Domain.TEMP_TOPIC_DOMAIN, name);
- } else {
- if( options.defaultDomain==null ) {
- return null;
+ def toBuffer(value: Destination, options: ParserOptions): AsciiBuffer = {
+ if (value == null) {
+ null
+ } else {
+ val baos = new ByteArrayOutputStream
+ def write(value: Destination):Unit = {
+ if (value.getDestinations != null) {
+ assert( options.separator.isDefined )
+ val first = true
+ for (d <- value.getDestinations) {
+ if (!first) {
+ baos.write(options.separator.get)
}
- return new SingleDestination(options.defaultDomain, value);
+ write(d)
+ }
+ } else {
+ value.getDomain match {
+ case Domain.QUEUE_DOMAIN =>
+ baos.write(options.queuePrefix)
+ case Domain.TOPIC_DOMAIN =>
+ baos.write(options.topicPrefix)
+ case Domain.TEMP_QUEUE_DOMAIN =>
+ baos.write(options.tempQueuePrefix)
+ case Domain.TEMP_TOPIC_DOMAIN =>
+ baos.write(options.tempTopicPrefix)
+ }
+ baos.write(value.getName)
}
+ }
+ write(value)
+ baos.toBuffer.ascii
}
+ }
- /**
- * Parses a destination which may or may not be a composite.
- *
- * @param value
- * @param options
- * @param compositeSeparator
- * @return
- */
- def parse(value:AsciiBuffer, options:ParserOptions , compositeSeparator:Byte ):Destination = {
- if( value == null ) {
- return null;
- }
+ /**
+ * Parses a destination which may or may not be a composite.
+ *
+ * @param value
+ * @param options
+ * @param compositeSeparator
+ * @return
+ */
+ def parse(value: AsciiBuffer, options: ParserOptions): Destination = {
+ if (value == null) {
+ return null;
+ }
- if( value.contains(compositeSeparator) ) {
- var rc = value.split(compositeSeparator);
- var dl:List[Destination] = Nil
- for (buffer <- rc) {
- val d = parse(buffer, options)
- if( d==null ) {
- return null;
- }
- dl = dl ::: d :: Nil
- }
- return new MultiDestination(dl.toArray[Destination]);
+ if (options.separator.isDefined && value.contains(options.separator.get)) {
+ var rc = value.split(options.separator.get);
+ var dl: List[Destination] = Nil
+ for (buffer <- rc) {
+ val d = parse(buffer, options)
+ if (d == null) {
+ return null;
+ }
+ dl = dl ::: d :: Nil
+ }
+ return new MultiDestination(dl.toArray[Destination]);
+ } else {
+ if (options.queuePrefix != null && value.startsWith(options.queuePrefix)) {
+ var name = value.slice(options.queuePrefix.length, value.length).ascii();
+ return new SingleDestination(Domain.QUEUE_DOMAIN, name);
+ } else if (options.topicPrefix != null && value.startsWith(options.topicPrefix)) {
+ var name = value.slice(options.topicPrefix.length, value.length).ascii();
+ return new SingleDestination(Domain.TOPIC_DOMAIN, name);
+ } else if (options.tempQueuePrefix != null && value.startsWith(options.tempQueuePrefix)) {
+ var name = value.slice(options.tempQueuePrefix.length, value.length).ascii();
+ return new SingleDestination(Domain.TEMP_QUEUE_DOMAIN, name);
+ } else if (options.tempTopicPrefix != null && value.startsWith(options.tempTopicPrefix)) {
+ var name = value.slice(options.tempTopicPrefix.length, value.length).ascii();
+ return new SingleDestination(Domain.TEMP_TOPIC_DOMAIN, name);
+ } else {
+ if (options.defaultDomain == null) {
+ return null;
}
- return parse(value, options);
+ return new SingleDestination(options.defaultDomain, value);
+ }
}
+ }
}
/**
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
-case class SingleDestination(var domain:AsciiBuffer=null, var name:AsciiBuffer=null) extends Destination {
+case class SingleDestination(var domain: AsciiBuffer = null, var name: AsciiBuffer = null) extends Destination {
+ def getDestinations(): Array[Destination] = null;
+ def getDomain(): AsciiBuffer = domain
- def getDestinations():Array[Destination] = null;
- def getDomain():AsciiBuffer = domain
- def getName():AsciiBuffer = name
+ def getName(): AsciiBuffer = name
- override def toString() = ""+domain+":"+name
+ override def toString() = "" + domain + ":" + name
}
/**
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
-case class MultiDestination(var destinations:Array[Destination]) extends Destination {
+case class MultiDestination(var destinations: Array[Destination]) extends Destination {
+ def getDestinations(): Array[Destination] = destinations;
+ def getDomain(): AsciiBuffer = null
- def getDestinations():Array[Destination] = destinations;
- def getDomain():AsciiBuffer = null
- def getName():AsciiBuffer = null
+ def getName(): AsciiBuffer = null
override def toString() = destinations.mkString(",")
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala?rev=961111&r1=961110&r2=961111&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala Wed Jul 7 03:58:43 2010
@@ -78,10 +78,10 @@ object Router extends Log {
*
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
-class Router(var queue:DispatchQueue) extends DispatchLogging {
+class Router(val host:VirtualHost) extends DispatchLogging {
override protected def log = Router
- protected def dispatchQueue:DispatchQueue = queue
+ protected def dispatchQueue:DispatchQueue = host.dispatchQueue
trait DestinationNode {
var targets = List[DeliveryConsumer]()
@@ -120,22 +120,39 @@ class Router(var queue:DispatchQueue) ex
}
class QueueDestinationNode(destination:Destination) extends DestinationNode {
- val queue = new Queue(destination)
+ var queue:Queue = null
+
+ // once the queue is created.. connect it up with the producers and targets.
+ host.getQueue(destination) { q =>
+ dispatchQueue {
+ queue = q;
+ queue.bind(targets)
+ routes.foreach({route=>
+ route.connected(queue :: Nil)
+ })
+ }
+ }
def on_bind(x:List[DeliveryConsumer]) = {
targets = x ::: targets
- queue.bind(x)
+ if( queue!=null ) {
+ queue.bind(x)
+ }
}
def on_unbind(x:List[DeliveryConsumer]):Boolean = {
targets = targets.filterNot({t=>x.contains(t)})
- queue.unbind(x)
+ if( queue!=null ) {
+ queue.unbind(x)
+ }
routes == Nil && targets == Nil
}
def on_connect(route:DeliveryProducerRoute) = {
routes = route :: routes
- route.connected(queue :: Nil)
+ if( queue!=null ) {
+ route.connected(queue :: Nil)
+ }
}
}
@@ -156,13 +173,13 @@ class Router(var queue:DispatchQueue) ex
def bind(destination:Destination, targets:List[DeliveryConsumer]) = retaining(targets) {
get(destination).on_bind(targets)
- } >>: queue
+ } >>: dispatchQueue
def unbind(destination:Destination, targets:List[DeliveryConsumer]) = releasing(targets) {
if( get(destination).on_unbind(targets) ) {
destinations.remove(destination)
}
- } >>: queue
+ } >>: dispatchQueue
def connect(destination:Destination, routeQueue:DispatchQueue, producer:DeliveryProducer)(completed: (DeliveryProducerRoute)=>Unit) = {
val route = new DeliveryProducerRoute(destination, routeQueue, producer) {
@@ -172,7 +189,7 @@ class Router(var queue:DispatchQueue) ex
}
^ {
get(destination).on_connect(route)
- } >>: queue
+ } >>: dispatchQueue
}
def isTopic(destination:Destination) = destination.getDomain == TOPIC_DOMAIN
@@ -180,7 +197,7 @@ class Router(var queue:DispatchQueue) ex
def disconnect(route:DeliveryProducerRoute) = releasing(route) {
get(route.destination).on_disconnect(route)
- } >>: queue
+ } >>: dispatchQueue
def each(proc:(Destination, DestinationNode)=>Unit) = {
@@ -235,7 +252,7 @@ class DeliveryProducerRoute(val destinat
private def internal_bind(values:List[DeliveryConsumer]) = {
values.foreach{ x=>
debug("producer route attaching to conusmer.")
- targets = x.open_session(dispatchQueue) :: targets
+ targets = x.connect(dispatchQueue) :: targets
}
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala?rev=961111&r1=961110&r2=961111&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/VirtualHost.scala Wed Jul 7 03:58:43 2010
@@ -36,6 +36,12 @@ import ReporterLevel._
*/
object VirtualHost extends Log {
+ val destination_parser_options = new ParserOptions
+ destination_parser_options.queuePrefix = new AsciiBuffer("queue:")
+ destination_parser_options.topicPrefix = new AsciiBuffer("topic:")
+ destination_parser_options.tempQueuePrefix = new AsciiBuffer("temp-queue:")
+ destination_parser_options.tempTopicPrefix = new AsciiBuffer("temp-topic:")
+
/**
* Creates a default a configuration object.
*/
@@ -67,20 +73,20 @@ class VirtualHost(val broker: Broker) ex
import VirtualHost._
override protected def log = VirtualHost
- override protected val dispatchQueue:DispatchQueue = ScalaDispatch.createQueue("virtual-host");
+ override val dispatchQueue:DispatchQueue = ScalaDispatch.createQueue("virtual-host");
var config:VirtualHostDTO = _
private val queueStore = new BrokerQueueStore()
private val queues = new HashMap[AsciiBuffer, Queue]()
private val durableSubs = new HashMap[String, DurableSubscription]()
- val router = new Router(dispatchQueue)
+ val router = new Router(this)
var names:List[String] = Nil;
def setNamesArray( names:ArrayList[String]) = {
this.names = names.toList
}
- var database:BrokerDatabase = new BrokerDatabase
+ var database:BrokerDatabase = new BrokerDatabase(this)
var transactionManager:TransactionManager = new TransactionManager
override def toString = if (config==null) "virtual-host" else "virtual-host: "+config.id
@@ -103,29 +109,32 @@ class VirtualHost(val broker: Broker) ex
override protected def _start(onCompleted:Runnable):Unit = {
- database.virtualHost = this
database.start();
-// router.setDatabase(database);
+ database.listQueues { ids =>
+ for( id <- ids) {
+ database.getQueueInfo(id) { x =>
+ x match {
+ case Some(info)=>
+ dispatchQueue ^{
+ val dest = DestinationParser.parse(info.record.name, destination_parser_options)
+ if( dest.getDomain == Domain.QUEUE_DOMAIN ) {
+
+ val queue = new Queue(dest, id)
+ queue.first_seq = info.first
+ queue.last_seq = info.last
+ queue.message_seq_counter = info.last+1
+ queue.count = info.count
+
+ queues.put(info.record.name, queue)
+ }
+ }
+ case _ =>
+ }
+ }
+ }
+ }
- //Recover queues:
- queueStore.setDatabase(database);
- queueStore.setDispatchQueue(dispatchQueue);
- queueStore.loadQueues();
-
- // Create Queue instances
-// TODO:
-// for (IQueue<Long, MessageDelivery> iQueue : queueStore.getSharedQueues()) {
-// Queue queue = new Queue(iQueue);
-// Domain domain = router.getDomain(Router.QUEUE_DOMAIN);
-// Destination dest = new Destination.SingleDestination(Router.QUEUE_DOMAIN, iQueue.getDescriptor().getQueueName());
-// queue.setDestination(dest);
-// domain.bind(dest.getName(), queue);
-// queues.put(dest.getName(), queue);
-// }
-// for (Queue queue : queues.values()) {
-// queue.start();
-// }
//Recover transactions:
transactionManager.virtualHost = this
@@ -154,31 +163,38 @@ class VirtualHost(val broker: Broker) ex
onCompleted.run
}
- def createQueue(dest:Destination) :Queue = {
-// if (!serviceState.isStarted) {
-// //Queues from the store must be loaded before we can create new ones:
-// throw new IllegalStateException("Can't create queue on unstarted host");
-// }
+ def getQueue(destination:Destination)(cb: (Queue)=>Unit ) = ^{
+ if( !serviceState.isStarted ) {
+ error("getQueue can only be called while the service is running.")
+ cb(null)
+ } else {
+ var queue = queues.get(destination);
+ if( queue==null && config.autoCreateQueues ) {
+ addQueue(destination)(cb)
+ } else {
+ cb(queue)
+ }
+ }
+ } |>>: dispatchQueue
- val queue = queues.get(dest);
-// TODO:
-// // If the queue doesn't exist create it:
-// if (queue == null) {
-// IQueue<Long, MessageDelivery> iQueue = queueStore.createSharedQueue(dest.getName().toString());
-// queue = new Queue(iQueue);
-// queue.setDestination(dest);
-// Domain domain = router.getDomain(dest.getDomain());
-// domain.bind(dest.getName(), queue);
-// queues.put(dest.getName(), queue);
-//
-// for (QueueLifecyleListener l : queueLifecyleListeners) {
-// l.onCreate(queue);
-// }
-// }
-// queue.start();
- queue;
- }
+ def addQueue(dest:Destination)(cb: (Queue)=>Unit ) = ^{
+ val name = DestinationParser.toBuffer(dest, destination_parser_options)
+ val record = QueueRecord(0, name, null, null)
+ database.addQueue(record) { rc =>
+ rc match {
+ case Some(id) =>
+ dispatchQueue ^ {
+ val queue = new Queue(dest, id)
+ queues.put(name, queue)
+ cb(queue)
+ }
+ case None => // store could not create
+ cb(null)
+ }
+ }
+ null
+ } |>>: dispatchQueue
def createSubscription(consumer:ConsumerContext):BrokerSubscription = {
createSubscription(consumer, consumer.getDestination());
@@ -227,7 +243,8 @@ class VirtualHost(val broker: Broker) ex
var queue = queues.get(destination.getName());
if (queue == null) {
if (consumer.autoCreateDestination()) {
- queue = createQueue(destination);
+// TODO
+// queue = createQueue(destination);
} else {
throw new IllegalStateException("The queue does not exist: " + destination.getName());
}
@@ -249,25 +266,25 @@ class VirtualHost(val broker: Broker) ex
}
}
-/**
- * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
- */
-class BrokerDatabase() {
-
- @BeanProperty
- var store:Store=new MemoryStore;
-
- @BeanProperty
- var virtualHost:VirtualHost=null;
-
- def start() ={
- //To change body of implemented methods use File | Settings | File Templates.
- }
-
- def stop() = {
- //To change body of implemented methods use File | Settings | File Templates.
- }
-
+///**
+// * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+// */
+//class BrokerDatabase() {
+//
+// @BeanProperty
+// var store:Store=new MemoryStore;
+//
+// @BeanProperty
+// var virtualHost:VirtualHost=null;
+//
+// def start() ={
+// //To change body of implemented methods use File | Settings | File Templates.
+// }
+//
+// def stop() = {
+// //To change body of implemented methods use File | Settings | File Templates.
+// }
+//
// TODO: re-implement.
// private static final boolean DEBUG = false;
//
@@ -1587,8 +1604,8 @@ class BrokerDatabase() {
// public void setStoreBypass(boolean enable) {
// this.storeBypass = enable;
// }
-
-}
+//
+//}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BaseBrokerPerfSupport.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BaseBrokerPerfSupport.scala?rev=961111&r1=961110&r2=961111&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BaseBrokerPerfSupport.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/test/scala/org/apache/activemq/apollo/broker/perf/BaseBrokerPerfSupport.scala Wed Jul 7 03:58:43 2010
@@ -396,12 +396,12 @@ abstract class BaseBrokerPerfSupport ext
val name = new AsciiBuffer("dest" + (i + 1))
var bean = new SingleDestination(domain, name)
dests(i) = bean;
- if (PTP) {
- sendBroker.defaultVirtualHost.createQueue(dests(i));
- if (MULTI_BROKER) {
- rcvBroker.defaultVirtualHost.createQueue(dests(i));
- }
- }
+// if (PTP) {
+// sendBroker.defaultVirtualHost.createQueue(dests(i));
+// if (MULTI_BROKER) {
+// rcvBroker.defaultVirtualHost.createQueue(dests(i));
+// }
+// }
}
for (i <- 0 until producerCount) {
Modified: activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/VirtualHostDTO.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/VirtualHostDTO.java?rev=961111&r1=961110&r2=961111&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/VirtualHostDTO.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-dto/src/main/java/org/apache/activemq/apollo/dto/VirtualHostDTO.java Wed Jul 7 03:58:43 2010
@@ -32,5 +32,13 @@ public class VirtualHostDTO extends Serv
@XmlElementRef
public StoreDTO store;
-
+
+ /**
+ * Should queues be auto created when they are first accessed
+ * by clients?
+ */
+ @XmlAttribute(name="auto-create-queues")
+ public boolean autoCreateQueues = true;
+
+
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala?rev=961111&r1=961110&r2=961111&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala Wed Jul 7 03:58:43 2010
@@ -100,12 +100,14 @@ case class StompFrameMessage(frame:Stomp
header match {
case (Stomp.Headers.Message.MESSAGE_ID, value) =>
id = value
- case (Stomp.Headers.Message.PRORITY, value) =>
+ case (Stomp.Headers.Send.PRIORITY, value) =>
priority = java.lang.Integer.parseInt(value).toByte
- case (Stomp.Headers.Message.DESTINATION, value) =>
+ case (Stomp.Headers.Send.DESTINATION, value) =>
destination = value
- case (Stomp.Headers.Message.EXPIRATION_TIME, value) =>
+ case (Stomp.Headers.Send.EXPIRATION_TIME, value) =>
expiration = java.lang.Long.parseLong(value)
+ case (Stomp.Headers.Send.PERSISTENT, value) =>
+ persistent = java.lang.Boolean.parseBoolean(value)
case _ =>
}
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala?rev=961111&r1=961110&r2=961111&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala Wed Jul 7 03:58:43 2010
@@ -32,6 +32,8 @@ import java.io.IOException
object StompConstants {
+ val PROTOCOL = new AsciiBuffer("stomp");
+
val options = new ParserOptions
options.queuePrefix = new AsciiBuffer("/queue/")
options.topicPrefix = new AsciiBuffer("/topic/")
@@ -70,8 +72,8 @@ class StompProtocolHandler extends Proto
def matches(message:Delivery) = true
- def open_session(producer_queue:DispatchQueue) = new DeliverySession {
- val session = session_manager.session(producer_queue)
+ def connect(producer_queue:DispatchQueue) = new DeliverySession {
+ val session = session_manager.open(producer_queue)
val consumer = SimpleConsumer.this
retain
@@ -82,7 +84,7 @@ class StompProtocolHandler extends Proto
}
def close = {
- session.close
+ session_manager.close(session)
release
}
}
@@ -240,7 +242,15 @@ class StompProtocolHandler extends Proto
StompFrameMessage(StompFrame(Stomp.Responses.MESSAGE, frame.headers, frame.content))
}
- val delivery = Delivery(message, message.frame.size)
+ val delivery = new Delivery
+ delivery.message = message
+ delivery.size = message.frame.size
+ if( message.persistent ) {
+ // TODO:
+// val content = ascii("todo")
+// delivery.ref = host.database.createMessageRecord(message.id, content, PROTOCOL)
+ }
+
connection.transport.suspendRead
delivery.setDisposer(^{
connection.transport.resumeRead