You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2010/07/07 05:59:05 UTC
svn commit: r961112 - in /activemq/sandbox/activemq-apollo-actor:
activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/
activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/
activemq-stomp/src/test/scala/org/apache/activemq/apollo...
Author: chirino
Date: Wed Jul 7 03:59:04 2010
New Revision: 961112
URL: http://svn.apache.org/viewvc?rev=961112&view=rev
Log:
Implemented simpifiled flow control
Added:
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala
Modified:
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala
activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala
activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompBrokerPerfTest.scala
activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java
activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/DefaultTransportListener.java
activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/Transport.java
activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/TransportFilter.java
activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/TransportListener.java
activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransport.java
Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala?rev=961112&r1=961111&r2=961112&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala Wed Jul 7 03:59:04 2010
@@ -20,13 +20,13 @@ import _root_.java.io.{File}
import _root_.java.lang.{String}
import _root_.org.apache.activemq.util.{FactoryFinder}
import _root_.org.fusesource.hawtdispatch.ScalaDispatch._
-import org.fusesource.hawtdispatch.{Dispatch, DispatchQueue, BaseRetained}
+import org.fusesource.hawtdispatch.{Dispatch}
import org.fusesource.hawtbuf._
import ReporterLevel._
import AsciiBuffer._
-import org.apache.activemq.apollo.dto.{VirtualHostDTO, BrokerDTO}
+import org.apache.activemq.apollo.dto.{BrokerDTO}
import collection.{JavaConversions, SortedMap}
-import java.util.LinkedList
+import JavaConversions._
/**
* <p>
@@ -120,7 +120,6 @@ object Broker extends Log {
error("Broker basedir must be defined.")
}
- import JavaConversions._
for (host <- config.virtualHosts ) {
result |= VirtualHost.validate(host, reporter)
}
@@ -181,7 +180,6 @@ class Broker() extends BaseService with
// create the runtime objects from the config
{
- import JavaConversions._
dataDirectory = new File(config.basedir)
defaultVirtualHost = null
for (c <- config.virtualHosts) {
@@ -235,301 +233,3 @@ class Broker() extends BaseService with
} >>: dispatchQueue
}
-
-
-/**
- * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
- */
-trait QueueLifecyleListener {
-
- /**
- * A destination has bean created
- *
- * @param queue
- */
- def onCreate(queue:Queue);
-
- /**
- * A destination has bean destroyed
- *
- * @param queue
- */
- def onDestroy(queue:Queue);
-
-}
-
-
-
-
-object Queue extends Log {
- val maxOutboundSize = 1024*1204*5
-}
-
-/**
- *
- * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
- */
-class Queue(val destination:Destination, val storeId:Long) extends BaseRetained with Route with DeliveryConsumer with DeliveryProducer with DispatchLogging {
- override protected def log = Queue
-
- override val dispatchQueue:DispatchQueue = createQueue("queue:"+destination);
- dispatchQueue.setTargetQueue(getRandomThreadQueue)
- dispatchQueue {
- debug("created queue for: "+destination)
- }
-
- // sequence numbers.. used to track what's in the store.
- var first_seq = -1L
- var last_seq = -1L
- var message_seq_counter=0L
- def next_message_seq = {
- val rc = message_seq_counter
- message_seq_counter += 1
- rc
- }
-
- var count = 0
-
- val delivery_buffer = new DeliveryBuffer {
-
- override def send(delivery: Delivery) = {
- // Is it a persistent message?
- if( delivery.ref!=null ) {
- // next_message_seq
-
- }
- super.send(delivery)
- }
-
- override def ack(delivery: Delivery) = {
- super.ack(delivery)
- }
- }
- delivery_buffer.eventHandler = ^{ drain_delivery_buffer }
-
- val session_manager = new DeliverySessionManager(delivery_buffer, dispatchQueue)
-
- setDisposer(^{
- dispatchQueue.release
- session_manager.release
- })
-
- class ConsumerState(val consumer:DeliverySession) {
- var bound=true
-
- def deliver(value:Delivery):Unit = {
- val delivery = value.copy
- delivery.setDisposer(^{
- ^{ completed(value) } >>:dispatchQueue
- })
- consumer.deliver(delivery);
- delivery.release
- }
-
- def completed(delivery:Delivery) = {
- // Lets get back on the readyList if we are still bound.
- if( bound ) {
- readyConsumers.addLast(this)
- }
- delivery_buffer.ack(delivery)
- }
- }
-
- var allConsumers = Map[DeliveryConsumer,ConsumerState]()
- val readyConsumers = new LinkedList[ConsumerState]()
-
- def connected(consumers:List[DeliveryConsumer]) = bind(consumers)
- def bind(consumers:List[DeliveryConsumer]) = retaining(consumers) {
- for ( consumer <- consumers ) {
- val cs = new ConsumerState(consumer.connect(dispatchQueue))
- allConsumers += consumer->cs
- readyConsumers.addLast(cs)
- }
- drain_delivery_buffer
- } >>: dispatchQueue
-
- def unbind(consumers:List[DeliveryConsumer]) = releasing(consumers) {
- for ( consumer <- consumers ) {
- allConsumers.get(consumer) match {
- case Some(cs)=>
- cs.bound = false
- cs.consumer.close
- allConsumers -= consumer
- readyConsumers.remove(cs)
- case None=>
- }
- }
- } >>: dispatchQueue
-
- def disconnected() = throw new RuntimeException("unsupported")
-
- def collocate(value:DispatchQueue):Unit = {
- if( value.getTargetQueue ne dispatchQueue.getTargetQueue ) {
- println(dispatchQueue.getLabel+" co-locating with: "+value.getLabel);
- this.dispatchQueue.setTargetQueue(value.getTargetQueue)
- }
- }
-
-
- def drain_delivery_buffer: Unit = {
- while (!readyConsumers.isEmpty && !delivery_buffer.isEmpty) {
- val cs = readyConsumers.removeFirst
- val delivery = delivery_buffer.receive
- cs.deliver(delivery)
- }
- }
-
- def connect(producer_queue:DispatchQueue) = new DeliverySession {
-
- val session = session_manager.open(producer_queue)
- val consumer = Queue.this
- retain
-
- def deliver(delivery:Delivery) = session.send(delivery)
-
- def close = {
- session_manager.close(session)
- release
- }
- }
-
- def matches(message:Delivery) = { true }
-
-// def open_session(producer_queue:DispatchQueue) = new ConsumerSession {
-// val consumer = StompQueue.this
-// val deliveryQueue = new DeliveryOverflowBuffer(delivery_buffer)
-// retain
-//
-// def deliver(delivery:Delivery) = using(delivery) {
-// deliveryQueue.send(delivery)
-// } >>: queue
-//
-// def close = {
-// release
-// }
-// }
-
-
-}
-
-class XQueue(val destination:Destination) {
-
-// TODO:
-// private VirtualHost virtualHost;
-//
-// Queue() {
-// this.queue = queue;
-// }
-//
-// /*
-// * (non-Javadoc)
-// *
-// * @see
-// * org.apache.activemq.broker.DeliveryTarget#deliver(org.apache.activemq
-// * .broker.MessageDelivery, org.apache.activemq.flow.ISourceController)
-// */
-// public void deliver(MessageDelivery message, ISourceController<?> source) {
-// queue.add(message, source);
-// }
-//
-// public final void addSubscription(final Subscription<MessageDelivery> sub) {
-// queue.addSubscription(sub);
-// }
-//
-// public boolean removeSubscription(final Subscription<MessageDelivery> sub) {
-// return queue.removeSubscription(sub);
-// }
-//
-// public void start() throws Exception {
-// queue.start();
-// }
-//
-// public void stop() throws Exception {
-// if (queue != null) {
-// queue.stop();
-// }
-// }
-//
-// public void shutdown(Runnable onShutdown) throws Exception {
-// if (queue != null) {
-// queue.shutdown(onShutdown);
-// }
-// }
-//
-// public boolean hasSelector() {
-// return false;
-// }
-//
-// public boolean matches(MessageDelivery message) {
-// return true;
-// }
-//
-// public VirtualHost getBroker() {
-// return virtualHost;
-// }
-//
-// public void setVirtualHost(VirtualHost virtualHost) {
-// this.virtualHost = virtualHost;
-// }
-//
-// public void setDestination(Destination destination) {
-// this.destination = destination;
-// }
-//
-// public final Destination getDestination() {
-// return destination;
-// }
-//
-// public boolean isDurable() {
-// return true;
-// }
-//
-// public static class QueueSubscription implements BrokerSubscription {
-// Subscription<MessageDelivery> subscription;
-// final Queue queue;
-//
-// public QueueSubscription(Queue queue) {
-// this.queue = queue;
-// }
-//
-// /*
-// * (non-Javadoc)
-// *
-// * @see
-// * org.apache.activemq.broker.BrokerSubscription#connect(org.apache.
-// * activemq.broker.protocol.ProtocolHandler.ConsumerContext)
-// */
-// public void connect(ConsumerContext subscription) throws UserAlreadyConnectedException {
-// this.subscription = subscription;
-// queue.addSubscription(subscription);
-// }
-//
-// /*
-// * (non-Javadoc)
-// *
-// * @see
-// * org.apache.activemq.broker.BrokerSubscription#disconnect(org.apache
-// * .activemq.broker.protocol.ProtocolHandler.ConsumerContext)
-// */
-// public void disconnect(ConsumerContext context) {
-// queue.removeSubscription(subscription);
-// }
-//
-// /* (non-Javadoc)
-// * @see org.apache.activemq.broker.BrokerSubscription#getDestination()
-// */
-// public Destination getDestination() {
-// return queue.getDestination();
-// }
-// }
-
- // TODO:
- def matches(message:Delivery) = false
- def deliver(message:Delivery) = {
- // TODO:
- }
-
- def getDestination() = destination
-
- def shutdown = {}
-}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala?rev=961112&r1=961111&r2=961112&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala Wed Jul 7 03:59:04 2010
@@ -39,7 +39,7 @@ object Connection extends Log {
/**
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
-abstract class Connection() extends TransportListener with BaseService {
+abstract class Connection() extends DefaultTransportListener with BaseService {
override protected def log = Connection
@@ -47,13 +47,14 @@ abstract class Connection() extends Tran
val id = next_id
val dispatchQueue = createQueue(id)
var stopped = true
-
var transport:Transport = null
+ var transportSink:TransportSink = null
override def toString = id
override protected def _start(onCompleted:Runnable) = {
stopped = false
+ transportSink = new TransportSink(transport)
transport.setDispatchQueue(dispatchQueue);
transport.setTransportListener(Connection.this);
transport.start(onCompleted)
@@ -65,7 +66,7 @@ abstract class Connection() extends Tran
}
- def onTransportFailure(error:IOException) = {
+ override def onTransportFailure(error:IOException) = {
if (!stopped) {
onFailure(error);
}
@@ -76,10 +77,10 @@ abstract class Connection() extends Tran
transport.stop
}
- def onTransportDisconnected() = {
- }
-
- def onTransportConnected() = {
+ override def onRefill = {
+ if( transportSink.refiller !=null ) {
+ transportSink.refiller.run
+ }
}
}
@@ -109,7 +110,7 @@ class BrokerConnection(val connector: Co
override def onTransportDisconnected() = protocolHandler.onTransportDisconnected
- def onTransportCommand(command: Object) = {
+ override def onTransportCommand(command: Object) = {
try {
protocolHandler.onTransportCommand(command);
} catch {
@@ -119,6 +120,11 @@ class BrokerConnection(val connector: Co
}
override def onTransportFailure(error: IOException) = protocolHandler.onTransportFailure(error)
+
+ override def onRefill = {
+ super.onRefill
+ protocolHandler.onRefill
+ }
}
/**
@@ -133,7 +139,7 @@ class MultiProtocolHandler extends Proto
var connected = false
- def onTransportCommand(command:Any) = {
+ override def onTransportCommand(command:Any) = {
if (!command.isInstanceOf[WireFormat]) {
throw new ProtocolException("First command should be a WireFormat");
@@ -177,7 +183,7 @@ object ProtocolHandlerFactory {
/**
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
-trait ProtocolHandler extends TransportListener {
+trait ProtocolHandler extends DefaultTransportListener {
var connection:BrokerConnection = null;
@@ -185,18 +191,10 @@ trait ProtocolHandler extends TransportL
this.connection = brokerConnection
}
- def onTransportCommand(command:Any);
-
- def onTransportFailure(error:IOException) = {
+ override def onTransportFailure(error:IOException) = {
connection.stop()
}
- def onTransportDisconnected() = {
- }
-
- def onTransportConnected() = {
- }
-
}
/**
Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala?rev=961112&r1=961111&r2=961112&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala Wed Jul 7 03:59:04 2010
@@ -1,5 +1,5 @@
/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
+ * Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
@@ -19,20 +19,16 @@ package org.apache.activemq.apollo.broke
import _root_.org.apache.activemq.filter.{MessageEvaluationContext}
import _root_.java.lang.{String}
import _root_.org.fusesource.hawtdispatch._
-import _root_.org.fusesource.hawtdispatch.ScalaDispatch._
-import org.apache.activemq.transport.Transport
import org.fusesource.hawtbuf._
-import org.apache.activemq.util.TreeMap
-import java.util.concurrent.atomic.{AtomicLong, AtomicInteger}
-import java.util.{HashSet, LinkedList}
-
/**
* A producer which sends Delivery objects to a delivery consumer.
*
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
trait DeliveryProducer {
+ def dispatchQueue:DispatchQueue
def collocate(queue:DispatchQueue):Unit
+ def ack(message:Delivery) = {}
}
/**
@@ -41,9 +37,9 @@ trait DeliveryProducer {
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
trait DeliveryConsumer extends Retained {
+ def dispatchQueue:DispatchQueue;
def matches(message:Delivery)
- val dispatchQueue:DispatchQueue;
- def connect(producer:DispatchQueue):DeliverySession
+ def connect(producer:DeliveryProducer):Session
}
/**
@@ -53,9 +49,9 @@ trait DeliveryConsumer extends Retained
*
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
-trait DeliverySession {
- val consumer:DeliveryConsumer
- def deliver(delivery:Delivery)
+trait Session extends Sink[Delivery] {
+ def producer:DeliveryProducer
+ def consumer:DeliveryConsumer
def close:Unit
}
@@ -119,14 +115,10 @@ case class StoredMessageRef(id:Long) ext
*
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
-object Delivery {
-// def apply(o:Delivery) = {
-// rc new Delivery();
-// o.message, o.size, o.encoded, o.encoding, o.ack, o.tx_id, o.store_id)
-// }
+object Delivery extends Sizer[Delivery] {
+ def size(value:Delivery):Int = value.size
}
-
class Delivery extends BaseRetained {
/**
@@ -153,6 +145,11 @@ class Delivery extends BaseRetained {
def copy() = (new Delivery).set(this)
+ /**
+ * Does the producer require this message delivery to be ack?
+ */
+ var ack = false
+
def set(other:Delivery) = {
size = other.size
encoding = other.encoding
@@ -163,273 +160,3 @@ class Delivery extends BaseRetained {
}
}
-
-/**
- * <p>
- * Defines the interface for objects which you can send flow controlled deliveries to.
- * <p>
- * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
- */
-trait DeliverySink {
- def full:Boolean
- def send(delivery:Delivery):Unit
-}
-
-/**
- * <p>
- * A delivery sink which is connected to a transport. It expects the caller's dispatch
- * queue to be the same as the transport's/
- * <p>
- *
- * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
- */
-class TransportDeliverySink(var transport:Transport) extends DeliverySink {
- def full:Boolean = transport.isFull
- def send(delivery:Delivery) = if( transport.isConnected ) { transport.oneway(delivery.message, delivery) }
-}
-
-/**
- * <p>
- * A delivery sink which buffers deliveries sent to it up to it's
- * maxSize settings after which it starts flow controlling the sender.
- * <p>
- *
- * <p>
- * It executes the eventHandler when it has buffered deliveries. The event handler
- * should call receive to get the queued deliveries and then ack the delivery
- * once it has been processed. The producer will now be resumed until
- * the ack occurs.
- * <p>
- *
- * <p>
- * This class should only be called from a single serial dispatch queue.
- * <p>
- *
- * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
- */
-class DeliveryBuffer(var maxSize:Int=1024*32) extends DeliverySink {
-
- var deliveries = new LinkedList[Delivery]()
- private var size = 0
- var eventHandler: Runnable = null
-
- def full = size >= maxSize
-
- def drain = eventHandler.run
-
- def receive = deliveries.poll
-
- def isEmpty = deliveries.isEmpty
-
- def send(delivery:Delivery):Unit = {
- delivery.retain
- size += delivery.size
- deliveries.addLast(delivery)
- if( deliveries.size == 1 ) {
- drain
- }
- }
-
- def ack(delivery:Delivery) = {
- // When a message is delivered to the consumer, we release
- // used capacity in the outbound queue, and can drain the inbound
- // queue
- val wasBlocking = full
- size -= delivery.size
- delivery.release
- if( !isEmpty ) {
- drain
- }
- }
-
-}
-
-/**
- * Implements a delivery sink which sends to a 'down stream' delivery sink. If the
- * down stream delivery sink is full, this sink buffers the overflow deliveries. So that the
- * down stream sink does not need to worry about overflow.
- *
- * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
- */
-class DeliveryOverflowBuffer(val delivery_buffer:DeliverySink) extends DeliverySink {
-
- private var overflow = new LinkedList[Delivery]()
-
- protected def drainOverflow:Unit = {
- while( !overflow.isEmpty && !full ) {
- val delivery = overflow.removeFirst
- delivery.release
- send_to_delivery_buffer(delivery)
- }
- }
-
- def send(delivery:Delivery) = {
- if( full ) {
- // Deliveries in the overflow queue is remain acquired by us so that
- // producer that sent it to us gets flow controlled.
- delivery.retain
- overflow.addLast(delivery)
- } else {
- send_to_delivery_buffer(delivery)
- }
- }
-
- protected def send_to_delivery_buffer(value:Delivery) = {
- var delivery = value.copy
- delivery.setDisposer(^{
- drainOverflow
- })
- delivery_buffer.send(delivery)
- delivery.release
- }
-
- def full = delivery_buffer.full
-
-}
-
-/**
- * <p>
- * A DeliverySessionManager manages multiple credit based
- * transmission windows from multiple producers to a single consumer.
- * </p>
- *
- * <p>
- * Producers and consumers are typically executing on different threads and
- * the overhead of doing cross thread flow control is much higher than if
- * a credit window is used. The credit window allows the producer to
- * send multiple messages to the consumer without needing to wait for
- * consumer events. Only when the producer runs out of credit, does the
- * he start overflowing the producer. This class makes heavy
- * use of Dispatch Source objects to coalesce cross thread events
- * like the sending messages or credits.
- * </p>
- *
- * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
- */
-class DeliverySessionManager(val targetSink:DeliverySink, val queue:DispatchQueue) extends BaseRetained {
-
- var sessions = List[Session]()
-
- var session_min_credits = 1024*4;
- var session_credit_capacity = 1024*32
- var session_max_credits = session_credit_capacity;
-
- queue.retain
- setDisposer(^{
- source.release
- queue.release
- })
-
- // use a event aggregating source to coalesce multiple events from the same thread.
- // all the sessions send to the same source.
- val source = createSource(new ListEventAggregator[Delivery](), queue)
- source.setEventHandler(^{drain_source});
- source.resume
-
- def drain_source = {
- val deliveries = source.getData
- deliveries.foreach { delivery=>
- targetSink.send(delivery)
- delivery.release
- }
- }
-
- /**
- * tracks one producer to consumer session / credit window.
- */
- class Session(val producer_queue:DispatchQueue) extends DeliveryOverflowBuffer(targetSink) {
-
- // retain since the producer will be using this source to send messages
- // to the consumer
- source.retain
-
- ///////////////////////////////////////////////////
- // These members are used from the context of the
- // producer serial dispatch queue
- ///////////////////////////////////////////////////
-
- // create a source to coalesce credit events back to the producer side...
- val credit_adder = createSource(EventAggregators.INTEGER_ADD , producer_queue)
- credit_adder.setEventHandler(^{
- internal_credit(credit_adder.getData.intValue)
- });
- credit_adder.resume
-
- private var credits = 0;
-
- private var closed = false
-
- def close = {
- credit_adder.release
- source.release
- closed=true
- }
-
- override def full = credits <= 0
-
- override def send(delivery: Delivery) = {
- // retain the storage ref.. the target sink should
- // release once it no longer needs it.
- if( delivery.ref !=null ) {
- delivery.ref.retain
- }
- super.send(delivery)
- }
-
- override protected def send_to_delivery_buffer(value:Delivery) = {
- if( !closed ) {
- var delivery = value.copy
- credit_adder.retain
- delivery.setDisposer(^{
- // once the delivery is received by the consumer, event
- // the producer the credit.
- credit_adder.merge(delivery.size);
- credit_adder.release
- })
- internal_credit(-delivery.size)
- // use the source to send the consumer a delivery event.
- source.merge(delivery)
- }
- }
-
- def internal_credit(value:Int) = {
- credits += value;
- if( closed || credits <= 0 ) {
- credits = 0
- } else {
- drainOverflow
- }
- }
-
- ///////////////////////////////////////////////////
- // These members are used from the context of the
- // consumer serial dispatch queue
- ///////////////////////////////////////////////////
-
- private var _capacity = 0
-
- def credit(value:Int) = ^{
- internal_credit(value)
- } >>: producer_queue
-
- def capacity(value:Int) = {
- val change = value - _capacity;
- _capacity = value;
- credit(change)
- }
-
- }
-
- def open(producer_queue:DispatchQueue):DeliverySink = {
- val session = createSession(producer_queue)
- sessions = session :: sessions
- session.capacity(session_max_credits)
- session
- }
-
- def close(session:DeliverySink) = {
- session.asInstanceOf[DeliverySessionManager#Session].close
- }
-
- protected def createSession(producer_queue:DispatchQueue) = new Session(producer_queue)
-}
Added: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=961112&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala Wed Jul 7 03:59:04 2010
@@ -0,0 +1,340 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.broker
+
+import _root_.org.fusesource.hawtdispatch.ScalaDispatch._
+import org.fusesource.hawtdispatch.{DispatchQueue, BaseRetained}
+import collection.{SortedMap}
+import java.util.LinkedList
+
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+trait QueueLifecyleListener {
+
+ /**
+ * A destination has bean created
+ *
+ * @param queue
+ */
+ def onCreate(queue:Queue);
+
+ /**
+ * A destination has bean destroyed
+ *
+ * @param queue
+ */
+ def onDestroy(queue:Queue);
+
+}
+
+
+object Queue extends Log {
+ val maxOutboundSize = 1024*1204*5
+}
+
+/**
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+class Queue(val destination:Destination, val storeId:Long) extends BaseRetained with Route with DeliveryConsumer with DeliveryProducer with DispatchLogging {
+ override protected def log = Queue
+
+ override val dispatchQueue:DispatchQueue = createQueue("queue:"+destination);
+ dispatchQueue.setTargetQueue(getRandomThreadQueue)
+ dispatchQueue {
+ debug("created queue for: "+destination)
+ }
+
+ // sequence numbers.. used to track what's in the store.
+ var first_seq = -1L
+ var last_seq = -1L
+ var message_seq_counter=0L
+ def next_message_seq = {
+ val rc = message_seq_counter
+ message_seq_counter += 1
+ rc
+ }
+
+ var count = 0
+
+ val pending = new QueueSink[Delivery](Delivery) {
+ override def offer(delivery: Delivery) = {
+ val d = delivery.copy
+ d.ack = true
+ super.offer(d)
+ }
+ }
+ val session_manager = new SinkMux[Delivery](pending, dispatchQueue, Delivery)
+
+ pending.drainer = ^{ drain_delivery_buffer }
+
+ def drain_delivery_buffer: Unit = {
+ while (!readyConsumers.isEmpty && !pending.isEmpty) {
+ val cs = readyConsumers.removeFirst
+ val delivery = pending.poll
+ if( cs.session.offer(delivery) ) {
+ // consumer was not full.. keep him in the ready list
+ readyConsumers.addLast(cs)
+ } else {
+ // consumer full.
+ cs.ready = false
+ pending.unpoll(delivery)
+ }
+ }
+ }
+
+
+ // Use an event source to coalesce cross thread synchronization.
+ val ack_source = createSource(new ListEventAggregator[Delivery](), dispatchQueue)
+ ack_source.setEventHandler(^{drain_acks});
+ ack_source.resume
+ def drain_acks = {
+ ack_source.getData.foreach { ack =>
+ pending.ack(ack)
+ }
+ }
+ override def ack(ack:Delivery) = {
+ ack_source.merge(ack)
+ }
+
+ setDisposer(^{
+ dispatchQueue.release
+ session_manager.release
+ })
+
+ class ConsumerState(val session:Session) extends Runnable {
+ session.refiller = this
+ var bound=true
+ var ready=true
+
+ def run() = {
+ if( bound && !ready ) {
+ ready = true
+ readyConsumers.addLast(this)
+ drain_delivery_buffer
+ }
+ }
+ }
+
+ var allConsumers = Map[DeliveryConsumer,ConsumerState]()
+ val readyConsumers = new LinkedList[ConsumerState]()
+
+ def connected(consumers:List[DeliveryConsumer]) = bind(consumers)
+ def bind(consumers:List[DeliveryConsumer]) = retaining(consumers) {
+ for ( consumer <- consumers ) {
+ val cs = new ConsumerState(consumer.connect(Queue.this))
+ allConsumers += consumer->cs
+ readyConsumers.addLast(cs)
+ }
+ drain_delivery_buffer
+ } >>: dispatchQueue
+
+ def unbind(consumers:List[DeliveryConsumer]) = releasing(consumers) {
+ for ( consumer <- consumers ) {
+ allConsumers.get(consumer) match {
+ case Some(cs)=>
+ cs.bound = false
+ cs.session.close
+ allConsumers -= consumer
+ readyConsumers.remove(cs)
+ case None=>
+ }
+ }
+ } >>: dispatchQueue
+
+ def disconnected() = throw new RuntimeException("unsupported")
+
+ def collocate(value:DispatchQueue):Unit = {
+ if( value.getTargetQueue ne dispatchQueue.getTargetQueue ) {
+ println(dispatchQueue.getLabel+" co-locating with: "+value.getLabel);
+ this.dispatchQueue.setTargetQueue(value.getTargetQueue)
+ }
+ }
+
+ def connect(p:DeliveryProducer) = new Session {
+ retain
+
+ override def consumer = Queue.this
+ override def producer = p
+
+ val session = session_manager.open(producer.dispatchQueue)
+
+ def close = {
+ session_manager.close(session)
+ release
+ }
+
+ // Delegate all the flow control stuff to the session
+ def full = session.full
+ def offer(value:Delivery) = {
+ if( session.full ) {
+ false
+ } else {
+ if( value.ref !=null ) {
+ value.ref.retain
+ }
+ val rc = session.offer(value)
+ assert(rc, "session should accept since it was not full")
+ true
+ }
+ }
+
+ def refiller = session.refiller
+ def refiller_=(value:Runnable) = { session.refiller=value }
+ }
+
+ def matches(message:Delivery) = { true }
+
+// def open_session(producer_queue:DispatchQueue) = new ConsumerSession {
+// val consumer = StompQueue.this
+// val deliveryQueue = new DeliveryOverflowBuffer(delivery_buffer)
+// retain
+//
+// def deliver(delivery:Delivery) = using(delivery) {
+// deliveryQueue.send(delivery)
+// } >>: queue
+//
+// def close = {
+// release
+// }
+// }
+
+
+}
+
+class XQueue(val destination:Destination) {
+
+// TODO:
+// private VirtualHost virtualHost;
+//
+// Queue() {
+// this.queue = queue;
+// }
+//
+// /*
+// * (non-Javadoc)
+// *
+// * @see
+// * org.apache.activemq.broker.DeliveryTarget#deliver(org.apache.activemq
+// * .broker.MessageDelivery, org.apache.activemq.flow.ISourceController)
+// */
+// public void deliver(MessageDelivery message, ISourceController<?> source) {
+// queue.add(message, source);
+// }
+//
+// public final void addSubscription(final Subscription<MessageDelivery> sub) {
+// queue.addSubscription(sub);
+// }
+//
+// public boolean removeSubscription(final Subscription<MessageDelivery> sub) {
+// return queue.removeSubscription(sub);
+// }
+//
+// public void start() throws Exception {
+// queue.start();
+// }
+//
+// public void stop() throws Exception {
+// if (queue != null) {
+// queue.stop();
+// }
+// }
+//
+// public void shutdown(Runnable onShutdown) throws Exception {
+// if (queue != null) {
+// queue.shutdown(onShutdown);
+// }
+// }
+//
+// public boolean hasSelector() {
+// return false;
+// }
+//
+// public boolean matches(MessageDelivery message) {
+// return true;
+// }
+//
+// public VirtualHost getBroker() {
+// return virtualHost;
+// }
+//
+// public void setVirtualHost(VirtualHost virtualHost) {
+// this.virtualHost = virtualHost;
+// }
+//
+// public void setDestination(Destination destination) {
+// this.destination = destination;
+// }
+//
+// public final Destination getDestination() {
+// return destination;
+// }
+//
+// public boolean isDurable() {
+// return true;
+// }
+//
+// public static class QueueSubscription implements BrokerSubscription {
+// Subscription<MessageDelivery> subscription;
+// final Queue queue;
+//
+// public QueueSubscription(Queue queue) {
+// this.queue = queue;
+// }
+//
+// /*
+// * (non-Javadoc)
+// *
+// * @see
+// * org.apache.activemq.broker.BrokerSubscription#connect(org.apache.
+// * activemq.broker.protocol.ProtocolHandler.ConsumerContext)
+// */
+// public void connect(ConsumerContext subscription) throws UserAlreadyConnectedException {
+// this.subscription = subscription;
+// queue.addSubscription(subscription);
+// }
+//
+// /*
+// * (non-Javadoc)
+// *
+// * @see
+// * org.apache.activemq.broker.BrokerSubscription#disconnect(org.apache
+// * .activemq.broker.protocol.ProtocolHandler.ConsumerContext)
+// */
+// public void disconnect(ConsumerContext context) {
+// queue.removeSubscription(subscription);
+// }
+//
+// /* (non-Javadoc)
+// * @see org.apache.activemq.broker.BrokerSubscription#getDestination()
+// */
+// public Destination getDestination() {
+// return queue.getDestination();
+// }
+// }
+
+ // TODO:
+ def matches(message:Delivery) = false
+ def deliver(message:Delivery) = {
+ // TODO:
+ }
+
+ def getDestination() = destination
+
+ def shutdown = {}
+}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala?rev=961112&r1=961111&r2=961112&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala Wed Jul 7 03:59:04 2010
@@ -181,8 +181,8 @@ class Router(val host:VirtualHost) exten
}
} >>: dispatchQueue
- def connect(destination:Destination, routeQueue:DispatchQueue, producer:DeliveryProducer)(completed: (DeliveryProducerRoute)=>Unit) = {
- val route = new DeliveryProducerRoute(destination, routeQueue, producer) {
+ def connect(destination:Destination, producer:DeliveryProducer)(completed: (DeliveryProducerRoute)=>Unit) = {
+ val route = new DeliveryProducerRoute(destination, producer) {
override def on_connected = {
completed(this);
}
@@ -214,8 +214,8 @@ class Router(val host:VirtualHost) exten
*/
trait Route extends Retained {
- val destination:Destination
- val dispatchQueue:DispatchQueue
+ def destination:Destination
+ def dispatchQueue:DispatchQueue
val metric = new AtomicLong();
def connected(targets:List[DeliveryConsumer]):Unit
@@ -228,9 +228,10 @@ trait Route extends Retained {
/**
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
*/
-class DeliveryProducerRoute(val destination:Destination, override val dispatchQueue:DispatchQueue, val producer:DeliveryProducer) extends BaseRetained with Route with DispatchLogging {
+class DeliveryProducerRoute(val destination:Destination, val producer:DeliveryProducer) extends BaseRetained with Route with Sink[Delivery] with DispatchLogging {
override protected def log = Router
+ override def dispatchQueue = producer.dispatchQueue
// Retain the queue while we are retained.
dispatchQueue.retain
@@ -238,7 +239,7 @@ class DeliveryProducerRoute(val destinat
dispatchQueue.release
})
- var targets = List[DeliverySession]()
+ var targets = List[Session]()
def connected(targets:List[DeliveryConsumer]) = retaining(targets) {
internal_bind(targets)
@@ -252,7 +253,9 @@ class DeliveryProducerRoute(val destinat
private def internal_bind(values:List[DeliveryConsumer]) = {
values.foreach{ x=>
debug("producer route attaching to conusmer.")
- targets = x.connect(dispatchQueue) :: targets
+ val target = x.connect(producer);
+ target.refiller = drainer
+ targets ::= target
}
}
@@ -278,4 +281,50 @@ class DeliveryProducerRoute(val destinat
protected def on_connected = {}
protected def on_disconnected = {}
+ //
+ // Sink trait implementation. This Sink overflows
+ // by 1 value. It's only full when overflowed. It overflows
+ // when one of the down stream sinks cannot accept the offered
+ // Dispatch.
+ //
+
+ var overflow:Delivery=null
+ var overflowSessions = List[Session]()
+ var refiller:Runnable=null
+
+ def full = overflow!=null
+
+ def offer(value:Delivery) = {
+ if( full ) {
+ false
+ } else {
+ targets.foreach { target=>
+ if( !target.offer(value) ) {
+ overflowSessions ::= target
+ }
+ }
+ if( overflowSessions!=Nil ) {
+ overflow = value
+ }
+ true
+ }
+ }
+
+ val drainer = ^{
+ if( overflow!=null ) {
+ val original = overflowSessions;
+ overflowSessions = Nil
+ original.foreach { target=>
+ if( !target.offer(overflow) ) {
+ overflowSessions ::= target
+ }
+ }
+ if( overflowSessions==Nil ) {
+ overflow = null
+ refiller.run
+ }
+ }
+ }
+
+
}
Added: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala?rev=961112&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala Wed Jul 7 03:59:04 2010
@@ -0,0 +1,345 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.broker
+
+import _root_.org.fusesource.hawtdispatch._
+import _root_.org.fusesource.hawtdispatch.ScalaDispatch._
+import org.apache.activemq.transport.Transport
+import java.util.{LinkedList}
+
+/**
+ * <p>
+ * Defines a simple trait to control the flow of data
+ * between callers and implementations of this trait.
+ * <p>
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+trait Sink[T] {
+
+ /**
+ * @return true if the sink is full
+ */
+ def full:Boolean
+
+ /**
+ * Attempts to add a value to the sink. If the sink is full,
+ * this method will typically return false. The caller should
+ * try to offer the value again once the refiller is exectuted.
+ *
+ * @return true if the value was added.
+ */
+ def offer(value:T):Boolean
+
+ /**
+ * Sets a refiller on the sink. The refiller is executed
+ * when the sink is interested in receiving more deliveries.
+ */
+ var refiller:Runnable
+}
+
+/**
+ * <p>
+ * A delivery sink which is connected to a transport. It expects the caller's dispatch
+ * queue to be the same as the transport's/
+ * <p>
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+class TransportSink(val transport:Transport) extends Sink[AnyRef] {
+ def full:Boolean = transport.full
+ def offer(value:AnyRef) = transport.offer(value)
+ var refiller:Runnable = null
+}
+
+/**
+ * Implements a delivery sink which buffers the overflow of deliveries that
+ * a 'down stream' sink cannot accept when it's full.
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+class OverflowSink[T](val downstream:Sink[T]) extends Sink[T] {
+
+ private var overflow = new LinkedList[T]()
+ var refiller: Runnable = null
+
+ downstream.refiller = ^{ drain }
+
+ protected def drain:Unit = {
+ while( overflowed ) {
+ val delivery = overflow.removeFirst
+ if( !downstream.offer(delivery) ) {
+ overflow.addFirst(delivery)
+ return
+ } else {
+ onDelivered(delivery)
+ }
+ }
+ // request a refill once the overflow is empty...
+ refiller.run
+ }
+
+ /**
+ * @return true always even when full since those messages just get stored in a
+ * overflow list
+ */
+ def offer(value:T) = {
+ if( overflowed || !downstream.offer(value)) {
+ overflow.addLast(value)
+ } else {
+ onDelivered(value)
+ }
+ true
+ }
+
+ /**
+ * Called for each value what is passed on to the down stream sink.
+ */
+ protected def onDelivered(value:T) = {
+ }
+
+ def overflowed = !overflow.isEmpty
+
+ def full = overflowed || downstream.full
+}
+
+object MapSink {
+ def apply[X,Y](downstream:Sink[X])(func: Y=>X ) = {
+ new Sink[Y] {
+ def refiller = downstream.refiller
+ def refiller_=(value:Runnable) = downstream.refiller=value
+
+ def full = downstream.full
+ def offer(value:Y) = {
+ downstream.offer(func(value))
+ }
+ }
+ }
+}
+
+/**
+ * <p>
+ * A SinkMux multiplexes access to a target sink so that multiple
+ * producers can send data to it concurrently. The SinkMux creates
+ * a new session/sink for each connected producer. The session
+ * uses credit based flow control to cut down the cross thread
+ * events issued.
+ * </p>
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+class SinkMux[T](val target:Sink[T], val queue:DispatchQueue, val sizer:Sizer[T]) extends BaseRetained {
+
+ var sessions = List[Session]()
+
+ var session_min_credits = 1024*4;
+ var session_credit_capacity = 1024*32
+ var session_max_credits = session_credit_capacity;
+
+ val overflow = new OverflowSink[(Session,T)](MapSink(target){_._2}) {
+
+ // Once a value leaves the overflow, then we can credit the
+ // session so that more messages can be accpted.
+ override protected def onDelivered(event:(Session,T)) = {
+ val session = event._1
+ val value = event._2
+ session.credit_adder.merge(sizer.size(value));
+ session.credit_adder.release
+ }
+ }
+ // As messages are delivered, and we credit the sessions,
+ // that triggers the sessions to refill the overflow. No
+ // need to have a refiller action.
+ overflow.refiller = ^ { }
+
+ queue.retain
+ setDisposer(^{
+ source.release
+ queue.release
+ })
+
+ // use a event aggregating source to coalesce multiple events from the same thread.
+ // all the sessions send to the same source.
+ val source = createSource(new ListEventAggregator[(Session,T)](), queue)
+ source.setEventHandler(^{drain_source});
+ source.resume
+
+ def drain_source = {
+ source.getData.foreach { event =>
+ // overflow sinks can always accept more values.
+ overflow.offer(event)
+ }
+ }
+
+ /**
+ * tracks one producer to consumer session / credit window.
+ */
+ class Session(val producer_queue:DispatchQueue) extends Sink[T] {
+
+ // retain since the producer will be using this source to send messages
+ // to the consumer
+ source.retain
+
+ ///////////////////////////////////////////////////
+ // These members are used from the context of the
+ // producer serial dispatch queue
+ ///////////////////////////////////////////////////
+
+ // create a source to coalesce credit events back to the producer side...
+ val credit_adder = createSource(EventAggregators.INTEGER_ADD , producer_queue)
+ credit_adder.setEventHandler(^{
+ internal_credit(credit_adder.getData.intValue)
+ });
+ credit_adder.resume
+
+ private var credits = 0;
+ private var closed = false
+ var refiller:Runnable = null
+ var _full = false
+
+ def close = {
+ credit_adder.release
+ source.release
+ closed=true
+ }
+
+ override def full = _full
+
+ override def offer(value: T) = {
+ if( _full || closed ) {
+ false
+ } else {
+ credit_adder.retain
+ internal_credit(-sizer.size(value))
+ source.merge((this, value))
+ true
+ }
+ }
+
+ def internal_credit(value:Int) = {
+ credits += value;
+ if( closed || credits <= 0 ) {
+ _full = true
+ } else if( credits==session_max_credits ) {
+ // refill once we are empty.
+ if( _full ) {
+ _full = false
+ refiller.run
+ }
+ }
+ }
+
+ ///////////////////////////////////////////////////
+ // These members are used from the context of the
+ // consumer serial dispatch queue
+ ///////////////////////////////////////////////////
+
+ private var _capacity = 0
+
+ def credit(value:Int) = ^{
+ internal_credit(value)
+ } >>: producer_queue
+
+ def capacity(value:Int) = {
+ val change = value - _capacity;
+ _capacity = value;
+ credit(change)
+ }
+
+ }
+
+ def open(producer_queue:DispatchQueue):Sink[T] = {
+ val session = createSession(producer_queue)
+ sessions = session :: sessions
+ session.capacity(session_max_credits)
+ session
+ }
+
+ def close(session:Sink[T]) = {
+ session.asInstanceOf[SinkMux[T]#Session].close
+ }
+
+ protected def createSession(producer_queue:DispatchQueue) = new Session(producer_queue)
+}
+
+/**
+ * A sizer can determine the size of other objects.
+ */
+trait Sizer[T] {
+ def size(value:T):Int
+}
+
+/**
+ * <p>
+ * A delivery sink which buffers deliveries sent to it up to it's
+ * maxSize settings after which it starts flow controlling the sender.
+ * <p>
+ *
+ * <p>
+ * It executes the drainer when it has queued values. The drainer
+ * should call poll to get the queued values and then ack the values
+ * once they have been processed to allow additional values to be accepted.
+ * The refiller is executed once the the queue is drained.
+ * <p>
+ *
+ * <p>
+ * This class should only be called from a single serial dispatch queue.
+ * <p>
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+class QueueSink[T](val sizer:Sizer[T], var maxSize:Int=1024*32) extends Sink[T] {
+
+ var buffer = new LinkedList[T]()
+ private var size = 0
+
+ var drainer: Runnable = null
+ var refiller: Runnable = null
+
+ def full = size >= maxSize
+ def poll = buffer.poll
+ def unpoll(value:T) = buffer.addFirst(value)
+ def isEmpty = buffer.isEmpty
+
+ private def drain = drainer.run
+
+ def offer(value:T):Boolean = {
+ if( full ) {
+ false
+ } else {
+ size += sizer.size(value)
+ buffer.addLast(value)
+ if( buffer.size == 1 ) {
+ drain
+ }
+ true
+ }
+ }
+
+ def ack(delivery:T) = {
+ // When a message is delivered to the consumer, we release
+ // used capacity in the outbound queue, and can drain the inbound
+ // queue
+ val wasBlocking = full
+ size -= sizer.size(delivery)
+ if( !isEmpty ) {
+ drain
+ } else {
+ refiller.run
+ }
+ }
+
+}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala?rev=961112&r1=961111&r2=961112&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala Wed Jul 7 03:59:04 2010
@@ -20,7 +20,7 @@ import _root_.java.util.LinkedList
import _root_.org.apache.activemq.filter.{Expression, MessageEvaluationContext}
import _root_.org.fusesource.hawtbuf._
import collection.mutable.ListBuffer
-import org.apache.activemq.apollo.broker.{Destination, BufferConversions, Message}
+import org.apache.activemq.apollo.broker.{Sizer, Destination, BufferConversions, Message}
/**
*
@@ -113,6 +113,10 @@ case class StompFrameMessage(frame:Stomp
}
}
+object StompFrame extends Sizer[StompFrame] {
+ def size(value:StompFrame) = value.size
+}
+
/**
* Represents all the data in a STOMP frame.
*
Modified: activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala?rev=961112&r1=961111&r2=961112&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala Wed Jul 7 03:59:04 2010
@@ -60,9 +60,7 @@ class StompProtocolHandler extends Proto
protected def dispatchQueue:DispatchQueue = connection.dispatchQueue
class SimpleConsumer(val destination:Destination) extends BaseRetained with DeliveryConsumer {
-
val dispatchQueue = StompProtocolHandler.this.dispatchQueue
- val session_manager = new DeliverySessionManager(outboundChannel, dispatchQueue)
dispatchQueue.retain
setDisposer(^{
@@ -70,27 +68,49 @@ class StompProtocolHandler extends Proto
dispatchQueue.release
})
- def matches(message:Delivery) = true
-
- def connect(producer_queue:DispatchQueue) = new DeliverySession {
- val session = session_manager.open(producer_queue)
+ def matches(delivery:Delivery) = {
+ // TODO add selector support here...
+ delivery.message.isInstanceOf[StompFrameMessage]
+ }
- val consumer = SimpleConsumer.this
+ def connect(p:DeliveryProducer) = new Session {
retain
- def deliver(delivery:Delivery) = {
-// info("Delivering to consumer session")
- session.send(delivery)
- }
+ def producer = p
+ def consumer = SimpleConsumer.this
+
+ val session = session_manager.open(producer.dispatchQueue)
def close = {
session_manager.close(session)
release
}
+
+ // Delegate all the flow control stuff to the session
+ def full = session.full
+ def offer(delivery:Delivery) = {
+ if( session.full ) {
+ false
+ } else {
+ if( delivery.ack ) {
+ producer.ack(delivery)
+ }
+ val frame = delivery.message.asInstanceOf[StompFrameMessage].frame
+ val rc = session.offer(frame)
+ assert(rc, "offer should be accepted since it was not full")
+ true
+ }
+ }
+
+ def refiller = session.refiller
+ def refiller_=(value:Runnable) = { session.refiller=value }
+
}
}
- var outboundChannel:TransportDeliverySink = null
+ var session_manager:SinkMux[StompFrame] = null
+ var connection_sink:Sink[StompFrame] = null
+
var closed = false
var consumer:SimpleConsumer = null
@@ -100,13 +120,10 @@ class StompProtocolHandler extends Proto
private def queue = connection.dispatchQueue
override def onTransportConnected() = {
- outboundChannel = new TransportDeliverySink(connection.transport) {
- override def send(delivery: Delivery) = {
- if( transport.isConnected ) {
- transport.oneway(delivery.message.asInstanceOf[StompFrameMessage].frame, delivery)
- }
- }
- }
+ session_manager = new SinkMux[StompFrame]( MapSink(connection.transportSink){ x=>x }, dispatchQueue, StompFrame)
+ connection_sink = new OverflowSink(session_manager.open(dispatchQueue));
+ connection_sink.refiller = ^{}
+
connection.connector.broker.getDefaultVirtualHost(
queue.wrap { (host)=>
this.host=host
@@ -115,7 +132,6 @@ class StompProtocolHandler extends Proto
)
}
-
override def onTransportDisconnected() = {
if( !closed ) {
closed=true;
@@ -132,7 +148,7 @@ class StompProtocolHandler extends Proto
}
- def onTransportCommand(command:Any) = {
+ override def onTransportCommand(command:Any) = {
try {
command match {
case StompFrame(Commands.SEND, headers, content, _) =>
@@ -164,7 +180,7 @@ class StompProtocolHandler extends Proto
def on_stomp_connect(headers:HeaderMap) = {
- connection.transport.oneway(StompFrame(Responses.CONNECTED), null)
+ connection_sink.offer(StompFrame(Responses.CONNECTED))
}
def get(headers:HeaderMap, name:AsciiBuffer):Option[AsciiBuffer] = {
@@ -192,6 +208,7 @@ class StompProtocolHandler extends Proto
}
val producer = new DeliveryProducer() {
+ override def dispatchQueue = queue
override def collocate(value:DispatchQueue):Unit = ^{
// TODO:
// if( value.getTargetQueue ne queue.getTargetQueue ) {
@@ -206,11 +223,14 @@ class StompProtocolHandler extends Proto
// don't process frames until we are connected..
connection.transport.suspendRead
- host.router.connect(destiantion, queue, producer) {
+ host.router.connect(destiantion, producer) {
(route) =>
if( !connection.stopped ) {
connection.transport.resumeRead
producerRoute = route
+ producerRoute.refiller = ^{
+ connection.transport.resumeRead
+ }
send_via_route(producerRoute, frame)
}
}
@@ -251,14 +271,14 @@ class StompProtocolHandler extends Proto
// delivery.ref = host.database.createMessageRecord(message.id, content, PROTOCOL)
}
- connection.transport.suspendRead
- delivery.setDisposer(^{
- connection.transport.resumeRead
- })
- route.targets.foreach(consumer=>{
- consumer.deliver(delivery)
- })
- delivery.release;
+ // routes can allways accept at least 1 delivery...
+ assert( !route.full )
+ route.offer(delivery)
+ if( route.full ) {
+ // but once it gets full.. suspend, so that we get more stomp messages
+ // until it's not full anymore.
+ connection.transport.suspendRead
+ }
} else {
// info("Dropping message. No consumers interested in message.")
}
@@ -287,7 +307,7 @@ class StompProtocolHandler extends Proto
if( !connection.stopped ) {
info("Shutting connection down due to: "+msg)
connection.transport.suspendRead
- connection.transport.oneway(StompFrame(Responses.ERROR, Nil, ascii(msg)), null)
+ connection.transport.offer(StompFrame(Responses.ERROR, Nil, ascii(msg)), null)
^ {
connection.stop()
} >>: queue
Modified: activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompBrokerPerfTest.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompBrokerPerfTest.scala?rev=961112&r1=961111&r2=961112&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompBrokerPerfTest.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompBrokerPerfTest.scala Wed Jul 7 03:59:04 2010
@@ -33,45 +33,51 @@ import org.fusesource.hawtdispatch.BaseR
class StompBrokerPerfTest extends BaseBrokerPerfSupport {
println(getClass.getClassLoader.getResource("log4j.properties"));
- override def createProducer() = new StompRemoteProducer()
- override def createConsumer() = new StompRemoteConsumer()
- override def getRemoteWireFormat() = "stomp"
+ override def createProducer() = new StompRemoteProducer()
+
+ override def createConsumer() = new StompRemoteConsumer()
+
+ override def getRemoteWireFormat() = "stomp"
}
class StompRemoteConsumer extends RemoteConsumer {
+ var outboundSink: OverflowSink[StompFrame] = null
- def setupSubscription() = {
- val stompDestination = if( destination.getDomain() == Domain.QUEUE_DOMAIN ) {
- ascii("/queue/"+destination.getName().toString());
- } else {
- ascii("/topic/"+destination.getName().toString());
- }
+ def setupSubscription() = {
+ outboundSink = new OverflowSink[StompFrame](MapSink(transportSink){ x=>x })
+ outboundSink.refiller = ^{}
+
+ val stompDestination = if (destination.getDomain() == Domain.QUEUE_DOMAIN) {
+ ascii("/queue/" + destination.getName().toString());
+ } else {
+ ascii("/topic/" + destination.getName().toString());
+ }
- var frame = StompFrame(Stomp.Commands.CONNECT);
- transport.oneway(frame, null);
+ var frame = StompFrame(Stomp.Commands.CONNECT);
+ outboundSink.offer(frame);
- var headers:List[(AsciiBuffer, AsciiBuffer)] = Nil
- headers ::= (Stomp.Headers.Subscribe.DESTINATION, stompDestination)
- headers ::= (Stomp.Headers.Subscribe.ID, ascii("stomp-sub-"+name))
- headers ::= (Stomp.Headers.Subscribe.ACK_MODE, Stomp.Headers.Subscribe.AckModeValues.AUTO)
-
- frame = StompFrame(Stomp.Commands.SUBSCRIBE, headers);
- transport.oneway(frame, null);
- }
-
- def onTransportCommand(command:Object) = {
- var frame = command.asInstanceOf[StompFrame]
- frame match {
- case StompFrame(Responses.CONNECTED, headers, _, _) =>
- case StompFrame(Responses.MESSAGE, headers, content, _) =>
- messageReceived();
- case StompFrame(Responses.ERROR, headers, content, _) =>
- onFailure(new Exception("Server reported an error: " + frame.content));
- case _ =>
- onFailure(new Exception("Unexpected stomp command: " + frame.action));
- }
+ var headers: List[(AsciiBuffer, AsciiBuffer)] = Nil
+ headers ::= (Stomp.Headers.Subscribe.DESTINATION, stompDestination)
+ headers ::= (Stomp.Headers.Subscribe.ID, ascii("stomp-sub-" + name))
+ headers ::= (Stomp.Headers.Subscribe.ACK_MODE, Stomp.Headers.Subscribe.AckModeValues.AUTO)
+
+ frame = StompFrame(Stomp.Commands.SUBSCRIBE, headers);
+ outboundSink.offer(frame);
+ }
+
+ override def onTransportCommand(command: Object) = {
+ var frame = command.asInstanceOf[StompFrame]
+ frame match {
+ case StompFrame(Responses.CONNECTED, headers, _, _) =>
+ case StompFrame(Responses.MESSAGE, headers, content, _) =>
+ messageReceived();
+ case StompFrame(Responses.ERROR, headers, content, _) =>
+ onFailure(new Exception("Server reported an error: " + frame.content));
+ case _ =>
+ onFailure(new Exception("Unexpected stomp command: " + frame.action));
}
+ }
protected def messageReceived() {
if (thinkTime > 0) {
@@ -90,60 +96,69 @@ class StompRemoteConsumer extends Remote
}
class StompRemoteProducer extends RemoteProducer {
+ var outboundSink: OverflowSink[StompFrame] = null
+ var stompDestination: AsciiBuffer = null
+ var frame:StompFrame = null
+
+ def send_next: Unit = {
+ var headers: List[(AsciiBuffer, AsciiBuffer)] = Nil
+ headers ::= (Stomp.Headers.Send.DESTINATION, stompDestination);
+ if (property != null) {
+ headers ::= (ascii(property), ascii(property));
+ }
+ // var p = this.priority;
+ // if (priorityMod > 0) {
+ // p = if ((counter % priorityMod) == 0) { 0 } else { priority }
+ // }
+
+ var content = ascii(createPayload());
+ frame = StompFrame(Stomp.Commands.SEND, headers, content)
+ drain()
+ }
- var stompDestination:AsciiBuffer = null
-
- def send_next:Unit = {
- var headers: List[(AsciiBuffer, AsciiBuffer)] = Nil
- headers ::= (Stomp.Headers.Send.DESTINATION, stompDestination);
- if (property != null) {
- headers ::= (ascii(property), ascii(property));
- }
-// var p = this.priority;
-// if (priorityMod > 0) {
-// p = if ((counter % priorityMod) == 0) { 0 } else { priority }
-// }
-
- var content = ascii(createPayload());
- val frame = StompFrame(Stomp.Commands.SEND, headers, content)
- val delivery = new BaseRetained()
- delivery.setDisposer(^{
+ def drain() = {
+ if( frame!=null ) {
+ if( !outboundSink.full ) {
+ outboundSink.offer(frame)
+ frame = null
rate.increment();
val task = ^ {
- if( !stopped ) {
+ if (!stopped) {
send_next
}
}
- if( thinkTime > 0 ) {
+ if (thinkTime > 0) {
dispatchQueue.dispatchAfter(thinkTime, TimeUnit.MILLISECONDS, task)
} else {
dispatchQueue << task
}
- })
- transport.oneway(frame, delivery)
- delivery.release
+ }
}
+ }
- override def setupProducer() = {
- if( destination.getDomain() == Domain.QUEUE_DOMAIN ) {
- stompDestination = ascii("/queue/"+destination.getName().toString());
- } else {
- stompDestination = ascii("/topic/"+destination.getName().toString());
- }
- transport.oneway(StompFrame(Stomp.Commands.CONNECT), null);
- send_next
+ override def setupProducer() = {
+ outboundSink = new OverflowSink[StompFrame](MapSink(transportSink){ x=>x })
+ outboundSink.refiller = ^ { drain }
+
+ if (destination.getDomain() == Domain.QUEUE_DOMAIN) {
+ stompDestination = ascii("/queue/" + destination.getName().toString());
+ } else {
+ stompDestination = ascii("/topic/" + destination.getName().toString());
}
+ outboundSink.offer(StompFrame(Stomp.Commands.CONNECT));
+ send_next
+ }
- def onTransportCommand(command:Object) = {
- var frame = command.asInstanceOf[StompFrame]
- frame match {
- case StompFrame(Responses.CONNECTED, headers, _, _) =>
- case StompFrame(Responses.ERROR, headers, content, _) =>
- onFailure(new Exception("Server reported an error: " + frame.content.utf8));
- case _ =>
- onFailure(new Exception("Unexpected stomp command: " + frame.action));
- }
+ override def onTransportCommand(command: Object) = {
+ var frame = command.asInstanceOf[StompFrame]
+ frame match {
+ case StompFrame(Responses.CONNECTED, headers, _, _) =>
+ case StompFrame(Responses.ERROR, headers, content, _) =>
+ onFailure(new Exception("Server reported an error: " + frame.content.utf8));
+ case _ =>
+ onFailure(new Exception("Unexpected stomp command: " + frame.action));
}
+ }
}
Modified: activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java?rev=961112&r1=961111&r2=961112&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java Wed Jul 7 03:59:04 2010
@@ -176,11 +176,11 @@ public class TcpTransport extends BaseSe
int bufferSize = 1024*64;
- final LinkedList<OneWay> outbound = new LinkedList<OneWay>();
DataByteArrayOutputStream next_outbound_buffer;
ByteBuffer outbound_buffer;
protected boolean useLocalHost = true;
ByteBuffer readBuffer = ByteBuffer.allocate(bufferSize);
+ boolean full = false;
private final Runnable CANCEL_HANDLER = new Runnable() {
public void run() {
@@ -357,11 +357,11 @@ public class TcpTransport extends BaseSe
}
- public boolean isFull() {
- return next_outbound_buffer.size() >= bufferSize>>2;
+ public boolean full() {
+ return full;
}
- public void oneway(Object command, Retained retained) {
+ public boolean offer(Object command) {
assert Dispatch.getCurrentQueue() == dispatchQueue;
try {
if (!socketState.is(CONNECTED.class)) {
@@ -372,30 +372,31 @@ public class TcpTransport extends BaseSe
}
} catch (IOException e) {
onTransportFailure(e);
- return;
+ return false;
}
- boolean wasEmpty = next_outbound_buffer.size()==0;
- if (retained!=null && isFull() ) {
- // retaining blocks the sender it is released.
- retained.retain();
- outbound.add(new OneWay(command, retained));
+ if ( full ) {
+ return false;
} else {
try {
wireformat.marshal(command, next_outbound_buffer);
+ if( next_outbound_buffer.size() >= bufferSize>>2 ) {
+ full = true;
+ }
} catch (IOException e) {
onTransportFailure(e);
- return;
+ return false;
}
if ( outbound_buffer.remaining()==0 ) {
writeSource.resume();
}
+ return true;
}
}
/**
- * @retruns true if the outbound has been drained of all objects and there are no in progress writes.
+ * @retruns true if there are no in progress writes.
*/
private boolean drainOutbound() {
assert Dispatch.getCurrentQueue() == dispatchQueue;
@@ -416,22 +417,15 @@ public class TcpTransport extends BaseSe
outbound_buffer = next_outbound_buffer.toBuffer().toByteBuffer();
next_outbound_buffer = new DataByteArrayOutputStream(prev_size);
} else {
- // marshall all the available frames..
- OneWay oneWay = outbound.poll();
- while (oneWay != null) {
- wireformat.marshal(oneWay.command, next_outbound_buffer);
- if (oneWay.retained != null) {
- oneWay.retained.release();
- }
- if ( isFull() ) {
- oneWay = null;
- } else {
- oneWay = outbound.poll();
+ if( full ) {
+ full = false;
+ listener.onRefill();
+ // If the listener did not have anything for us...
+ if (next_outbound_buffer.size() == 0) {
+ // the source is now drained...
+ return true;
}
- }
-
- if (next_outbound_buffer.size() == 0) {
- // the source is now drained...
+ } else {
return true;
}
}
@@ -442,8 +436,7 @@ public class TcpTransport extends BaseSe
onTransportFailure(e);
return true;
}
-
- return outbound.isEmpty() && outbound_buffer == null;
+ return outbound_buffer.remaining() == 0;
}
private void drainInbound() throws IOException {
Modified: activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/DefaultTransportListener.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/DefaultTransportListener.java?rev=961112&r1=961111&r2=961112&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/DefaultTransportListener.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/DefaultTransportListener.java Wed Jul 7 03:59:04 2010
@@ -32,6 +32,9 @@ public class DefaultTransportListener im
public void onTransportCommand(Object command) {
}
+ public void onRefill() {
+ }
+
/**
* An unrecoverable exception has occured on the transport
*
Modified: activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/Transport.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/Transport.java?rev=961112&r1=961111&r2=961112&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/Transport.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/Transport.java Wed Jul 7 03:59:04 2010
@@ -32,15 +32,15 @@ import org.fusesource.hawtdispatch.Retai
public interface Transport extends Service {
- boolean isFull();
+ boolean full();
/**
- * A one way asynchronous send.
+ * A one way asynchronous send of a command. Only sent if the the transport is not full.
*
* @param command
- * @throws IOException
+ * @return true if the command was accepted.
*/
- void oneway(Object command, Retained retained);
+ boolean offer(Object command);
/**
* Returns the current transport listener
Modified: activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/TransportFilter.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/TransportFilter.java?rev=961112&r1=961111&r2=961112&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/TransportFilter.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/TransportFilter.java Wed Jul 7 03:59:04 2010
@@ -109,17 +109,21 @@ public class TransportFilter implements
transportListener.onTransportCommand(command);
}
+ public void onRefill() {
+ transportListener.onRefill();
+ }
+
public String toString() {
return next.toString();
}
- public void oneway(Object command, Retained retained) {
- next.oneway(command, retained);
+ public boolean offer(Object command) {
+ return next.offer(command);
}
- public boolean isFull() {
- return next.isFull();
+ public boolean full() {
+ return next.full();
}
public void onTransportFailure(IOException error) {
Modified: activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/TransportListener.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/TransportListener.java?rev=961112&r1=961111&r2=961112&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/TransportListener.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/TransportListener.java Wed Jul 7 03:59:04 2010
@@ -30,6 +30,12 @@ public interface TransportListener {
* @param command
*/
void onTransportCommand(Object command);
+
+ /**
+ * transport can now accept more commands for transmission.
+ */
+ void onRefill();
+
/**
* An unrecoverable exception has occured on the transport
* @param error
Modified: activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransport.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransport.java?rev=961112&r1=961111&r2=961112&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransport.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransport.java Wed Jul 7 03:59:04 2010
@@ -154,39 +154,34 @@ public class PipeTransport implements Tr
}
}
- final LinkedList<OneWay> inbound = new LinkedList<OneWay>();
int outbound = 0;
int maxOutbound = 100;
- public boolean isFull() {
+ public boolean full() {
return outbound >= maxOutbound;
}
- public void oneway(Object command, Retained retained) {
+ public boolean offer(Object command) {
if( !connected ) {
- throw new IllegalStateException("Not connected.");
+ return false;
}
- if( isFull() && retained!=null) {
- retained.retain();
- inbound.add(new OneWay(command, retained));
+ if( full() ) {
+ return false;
} else {
- transmit(command, null);
+ transmit(command);
+ return true;
}
}
private void drainInbound() {
- while( !isFull() && !inbound.isEmpty() ) {
- OneWay oneWay = inbound.poll();
- transmit(oneWay.command, oneWay.retained);
+ if( !full() ) {
+ listener.onRefill();
}
}
- private void transmit(Object command, Retained retained) {
+ private void transmit(Object command) {
outbound++;
peer.dispatchSource.merge(command);
- if( retained!=null ) {
- retained.release();
- }
}
public String getRemoteAddress() {