You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2010/07/07 05:59:05 UTC

svn commit: r961112 - in /activemq/sandbox/activemq-apollo-actor: activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/ activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/ activemq-stomp/src/test/scala/org/apache/activemq/apollo...

Author: chirino
Date: Wed Jul  7 03:59:04 2010
New Revision: 961112

URL: http://svn.apache.org/viewvc?rev=961112&view=rev
Log:
Implemented simpifiled flow control

Added:
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala
Modified:
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
    activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala
    activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala
    activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompBrokerPerfTest.scala
    activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java
    activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/DefaultTransportListener.java
    activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/Transport.java
    activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/TransportFilter.java
    activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/TransportListener.java
    activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransport.java

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala?rev=961112&r1=961111&r2=961112&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Broker.scala Wed Jul  7 03:59:04 2010
@@ -20,13 +20,13 @@ import _root_.java.io.{File}
 import _root_.java.lang.{String}
 import _root_.org.apache.activemq.util.{FactoryFinder}
 import _root_.org.fusesource.hawtdispatch.ScalaDispatch._
-import org.fusesource.hawtdispatch.{Dispatch, DispatchQueue, BaseRetained}
+import org.fusesource.hawtdispatch.{Dispatch}
 import org.fusesource.hawtbuf._
 import ReporterLevel._
 import AsciiBuffer._
-import org.apache.activemq.apollo.dto.{VirtualHostDTO, BrokerDTO}
+import org.apache.activemq.apollo.dto.{BrokerDTO}
 import collection.{JavaConversions, SortedMap}
-import java.util.LinkedList
+import JavaConversions._ 
 
 /**
  * <p>
@@ -120,7 +120,6 @@ object Broker extends Log {
         error("Broker basedir must be defined.")
       }
 
-      import JavaConversions._
       for (host <- config.virtualHosts ) {
         result |= VirtualHost.validate(host, reporter)
       }
@@ -181,7 +180,6 @@ class Broker() extends BaseService with 
 
     // create the runtime objects from the config
     {
-      import JavaConversions._
       dataDirectory = new File(config.basedir)
       defaultVirtualHost = null
       for (c <- config.virtualHosts) {
@@ -235,301 +233,3 @@ class Broker() extends BaseService with 
   } >>: dispatchQueue
 
 }
-
-
-/**
- * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
- */
-trait QueueLifecyleListener {
-
-    /**
-     * A destination has bean created
-     *
-     * @param queue
-     */
-    def onCreate(queue:Queue);
-
-    /**
-     * A destination has bean destroyed
-     *
-     * @param queue
-     */
-    def onDestroy(queue:Queue);
-
-}
-
-
-
-
-object Queue extends Log {
-  val maxOutboundSize = 1024*1204*5
-}
-
-/**
- *
- * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
- */
-class Queue(val destination:Destination, val storeId:Long) extends BaseRetained with Route with DeliveryConsumer with DeliveryProducer with DispatchLogging {
-  override protected def log = Queue
-
-  override val dispatchQueue:DispatchQueue = createQueue("queue:"+destination);
-  dispatchQueue.setTargetQueue(getRandomThreadQueue)
-  dispatchQueue {
-    debug("created queue for: "+destination)
-  }
-
-  // sequence numbers.. used to track what's in the store.
-  var first_seq = -1L
-  var last_seq = -1L
-  var message_seq_counter=0L
-  def next_message_seq = {
-    val rc = message_seq_counter
-    message_seq_counter += 1
-    rc
-  }
-
-  var count = 0
-
-  val delivery_buffer  = new DeliveryBuffer {
-
-    override def send(delivery: Delivery) = {
-      // Is it a persistent message?
-      if( delivery.ref!=null ) {
-        // next_message_seq
-
-      }
-      super.send(delivery)
-    }
-
-    override def ack(delivery: Delivery) = {
-      super.ack(delivery)
-    }
-  }
-  delivery_buffer.eventHandler = ^{ drain_delivery_buffer }
-
-  val session_manager = new DeliverySessionManager(delivery_buffer, dispatchQueue)
-
-  setDisposer(^{
-    dispatchQueue.release
-    session_manager.release
-  })
-
-  class ConsumerState(val consumer:DeliverySession) {
-    var bound=true
-
-    def deliver(value:Delivery):Unit = {
-      val delivery = value.copy
-      delivery.setDisposer(^{
-        ^{ completed(value) } >>:dispatchQueue
-      })
-      consumer.deliver(delivery);
-      delivery.release
-    }
-
-    def completed(delivery:Delivery) = {
-      // Lets get back on the readyList if  we are still bound.
-      if( bound ) {
-        readyConsumers.addLast(this)
-      }
-      delivery_buffer.ack(delivery)
-    }
-  }
-
-  var allConsumers = Map[DeliveryConsumer,ConsumerState]()
-  val readyConsumers = new LinkedList[ConsumerState]()
-
-  def connected(consumers:List[DeliveryConsumer]) = bind(consumers)
-  def bind(consumers:List[DeliveryConsumer]) = retaining(consumers) {
-      for ( consumer <- consumers ) {
-        val cs = new ConsumerState(consumer.connect(dispatchQueue))
-        allConsumers += consumer->cs
-        readyConsumers.addLast(cs)
-      }
-      drain_delivery_buffer
-    } >>: dispatchQueue
-
-  def unbind(consumers:List[DeliveryConsumer]) = releasing(consumers) {
-      for ( consumer <- consumers ) {
-        allConsumers.get(consumer) match {
-          case Some(cs)=>
-            cs.bound = false
-            cs.consumer.close
-            allConsumers -= consumer
-            readyConsumers.remove(cs)
-          case None=>
-        }
-      }
-    } >>: dispatchQueue
-
-  def disconnected() = throw new RuntimeException("unsupported")
-
-  def collocate(value:DispatchQueue):Unit = {
-    if( value.getTargetQueue ne dispatchQueue.getTargetQueue ) {
-      println(dispatchQueue.getLabel+" co-locating with: "+value.getLabel);
-      this.dispatchQueue.setTargetQueue(value.getTargetQueue)
-    }
-  }
-
-
-  def drain_delivery_buffer: Unit = {
-    while (!readyConsumers.isEmpty && !delivery_buffer.isEmpty) {
-      val cs = readyConsumers.removeFirst
-      val delivery = delivery_buffer.receive
-      cs.deliver(delivery)
-    }
-  }
-
-  def connect(producer_queue:DispatchQueue) = new DeliverySession {
-
-    val session = session_manager.open(producer_queue)
-    val consumer = Queue.this
-    retain
-
-    def deliver(delivery:Delivery) = session.send(delivery)
-
-    def close = {
-      session_manager.close(session)
-      release
-    }
-  }
-
-  def matches(message:Delivery) = { true }
-
-//  def open_session(producer_queue:DispatchQueue) = new ConsumerSession {
-//    val consumer = StompQueue.this
-//    val deliveryQueue = new DeliveryOverflowBuffer(delivery_buffer)
-//    retain
-//
-//    def deliver(delivery:Delivery) = using(delivery) {
-//      deliveryQueue.send(delivery)
-//    } >>: queue
-//
-//    def close = {
-//      release
-//    }
-//  }
-
-
-}
-
-class XQueue(val destination:Destination) {
-
-// TODO:
-//    private VirtualHost virtualHost;
-//
-//    Queue() {
-//        this.queue = queue;
-//    }
-//
-//    /*
-//     * (non-Javadoc)
-//     *
-//     * @see
-//     * org.apache.activemq.broker.DeliveryTarget#deliver(org.apache.activemq
-//     * .broker.MessageDelivery, org.apache.activemq.flow.ISourceController)
-//     */
-//    public void deliver(MessageDelivery message, ISourceController<?> source) {
-//        queue.add(message, source);
-//    }
-//
-//    public final void addSubscription(final Subscription<MessageDelivery> sub) {
-//        queue.addSubscription(sub);
-//    }
-//
-//    public boolean removeSubscription(final Subscription<MessageDelivery> sub) {
-//        return queue.removeSubscription(sub);
-//    }
-//
-//    public void start() throws Exception {
-//        queue.start();
-//    }
-//
-//    public void stop() throws Exception {
-//        if (queue != null) {
-//            queue.stop();
-//        }
-//    }
-//
-//    public void shutdown(Runnable onShutdown) throws Exception {
-//        if (queue != null) {
-//            queue.shutdown(onShutdown);
-//        }
-//    }
-//
-//    public boolean hasSelector() {
-//        return false;
-//    }
-//
-//    public boolean matches(MessageDelivery message) {
-//        return true;
-//    }
-//
-//    public VirtualHost getBroker() {
-//        return virtualHost;
-//    }
-//
-//    public void setVirtualHost(VirtualHost virtualHost) {
-//        this.virtualHost = virtualHost;
-//    }
-//
-//    public void setDestination(Destination destination) {
-//        this.destination = destination;
-//    }
-//
-//    public final Destination getDestination() {
-//        return destination;
-//    }
-//
-//    public boolean isDurable() {
-//        return true;
-//    }
-//
-//    public static class QueueSubscription implements BrokerSubscription {
-//        Subscription<MessageDelivery> subscription;
-//        final Queue queue;
-//
-//        public QueueSubscription(Queue queue) {
-//            this.queue = queue;
-//        }
-//
-//        /*
-//         * (non-Javadoc)
-//         *
-//         * @see
-//         * org.apache.activemq.broker.BrokerSubscription#connect(org.apache.
-//         * activemq.broker.protocol.ProtocolHandler.ConsumerContext)
-//         */
-//        public void connect(ConsumerContext subscription) throws UserAlreadyConnectedException {
-//            this.subscription = subscription;
-//            queue.addSubscription(subscription);
-//        }
-//
-//        /*
-//         * (non-Javadoc)
-//         *
-//         * @see
-//         * org.apache.activemq.broker.BrokerSubscription#disconnect(org.apache
-//         * .activemq.broker.protocol.ProtocolHandler.ConsumerContext)
-//         */
-//        public void disconnect(ConsumerContext context) {
-//            queue.removeSubscription(subscription);
-//        }
-//
-//        /* (non-Javadoc)
-//         * @see org.apache.activemq.broker.BrokerSubscription#getDestination()
-//         */
-//        public Destination getDestination() {
-//            return queue.getDestination();
-//        }
-//    }
-
-  // TODO:
-  def matches(message:Delivery) = false
-  def deliver(message:Delivery) = {
-    // TODO:
-  }
-
-  def getDestination() = destination
-
-  def shutdown = {}
-}

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala?rev=961112&r1=961111&r2=961112&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Connection.scala Wed Jul  7 03:59:04 2010
@@ -39,7 +39,7 @@ object Connection extends Log {
 /**
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-abstract class Connection() extends TransportListener with BaseService  {
+abstract class Connection() extends DefaultTransportListener with BaseService  {
 
   override protected def log = Connection
 
@@ -47,13 +47,14 @@ abstract class Connection() extends Tran
   val id = next_id
   val dispatchQueue = createQueue(id)
   var stopped = true
-
   var transport:Transport = null
+  var transportSink:TransportSink = null 
 
   override def toString = id
 
   override protected def _start(onCompleted:Runnable) = {
     stopped = false
+    transportSink = new TransportSink(transport)
     transport.setDispatchQueue(dispatchQueue);
     transport.setTransportListener(Connection.this);
     transport.start(onCompleted)
@@ -65,7 +66,7 @@ abstract class Connection() extends Tran
   }
 
 
-  def onTransportFailure(error:IOException) = {
+  override def onTransportFailure(error:IOException) = {
     if (!stopped) {
         onFailure(error);
     }
@@ -76,10 +77,10 @@ abstract class Connection() extends Tran
     transport.stop
   }
 
-  def onTransportDisconnected() = {
-  }
-
-  def onTransportConnected() = {
+  override def onRefill = {
+    if( transportSink.refiller !=null ) {
+      transportSink.refiller.run
+    }
   }
 
 }
@@ -109,7 +110,7 @@ class BrokerConnection(val connector: Co
 
   override def onTransportDisconnected() = protocolHandler.onTransportDisconnected
 
-  def onTransportCommand(command: Object) = {
+  override def onTransportCommand(command: Object) = {
     try {
       protocolHandler.onTransportCommand(command);
     } catch {
@@ -119,6 +120,11 @@ class BrokerConnection(val connector: Co
   }
 
   override def onTransportFailure(error: IOException) = protocolHandler.onTransportFailure(error)
+
+  override def onRefill = {
+    super.onRefill
+    protocolHandler.onRefill
+  }
 }
 
 /**
@@ -133,7 +139,7 @@ class MultiProtocolHandler extends Proto
 
   var connected = false
 
-  def onTransportCommand(command:Any) = {
+  override def onTransportCommand(command:Any) = {
 
     if (!command.isInstanceOf[WireFormat]) {
       throw new ProtocolException("First command should be a WireFormat");
@@ -177,7 +183,7 @@ object ProtocolHandlerFactory {
 /**
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-trait ProtocolHandler extends TransportListener {
+trait ProtocolHandler extends DefaultTransportListener {
 
   var connection:BrokerConnection = null;
 
@@ -185,18 +191,10 @@ trait ProtocolHandler extends TransportL
     this.connection = brokerConnection
   }
 
-  def onTransportCommand(command:Any);
-
-  def onTransportFailure(error:IOException) = {
+  override def onTransportFailure(error:IOException) = {
     connection.stop()
   }
 
-  def onTransportDisconnected() = {
-  }
-
-  def onTransportConnected() = {
-  }
-
 }
 
 /**

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala?rev=961112&r1=961111&r2=961112&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala Wed Jul  7 03:59:04 2010
@@ -1,5 +1,5 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
  * The ASF licenses this file to You under the Apache License, Version 2.0
@@ -19,20 +19,16 @@ package org.apache.activemq.apollo.broke
 import _root_.org.apache.activemq.filter.{MessageEvaluationContext}
 import _root_.java.lang.{String}
 import _root_.org.fusesource.hawtdispatch._
-import _root_.org.fusesource.hawtdispatch.ScalaDispatch._
-import org.apache.activemq.transport.Transport
 import org.fusesource.hawtbuf._
-import org.apache.activemq.util.TreeMap
-import java.util.concurrent.atomic.{AtomicLong, AtomicInteger}
-import java.util.{HashSet, LinkedList}
-
 /**
  * A producer which sends Delivery objects to a delivery consumer.
  *
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
 trait DeliveryProducer {
+  def dispatchQueue:DispatchQueue
   def collocate(queue:DispatchQueue):Unit
+  def ack(message:Delivery) = {}
 }
 
 /**
@@ -41,9 +37,9 @@ trait DeliveryProducer {
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
 trait DeliveryConsumer extends Retained {
+  def dispatchQueue:DispatchQueue;
   def matches(message:Delivery)
-  val dispatchQueue:DispatchQueue;
-  def connect(producer:DispatchQueue):DeliverySession
+  def connect(producer:DeliveryProducer):Session
 }
 
 /**
@@ -53,9 +49,9 @@ trait DeliveryConsumer extends Retained 
  *
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-trait DeliverySession {
-  val consumer:DeliveryConsumer
-  def deliver(delivery:Delivery)
+trait Session extends Sink[Delivery] {
+  def producer:DeliveryProducer
+  def consumer:DeliveryConsumer
   def close:Unit
 }
 
@@ -119,14 +115,10 @@ case class StoredMessageRef(id:Long) ext
  *
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-object Delivery {
-//  def apply(o:Delivery) = {
-//    rc new Delivery();
-//    o.message, o.size, o.encoded, o.encoding, o.ack, o.tx_id, o.store_id)
-//  }
+object Delivery extends Sizer[Delivery] {
+  def size(value:Delivery):Int = value.size
 }
 
-
 class Delivery extends BaseRetained {
 
   /**
@@ -153,6 +145,11 @@ class Delivery extends BaseRetained {
 
   def copy() = (new Delivery).set(this)
 
+  /**
+   * Does the producer require this message delivery to be ack?
+   */
+  var ack = false
+
   def set(other:Delivery) = {
     size = other.size
     encoding = other.encoding
@@ -163,273 +160,3 @@ class Delivery extends BaseRetained {
   }
 
 }
-
-/**
- * <p>
- * Defines the interface for objects which you can send flow controlled deliveries to.
- * <p>
- * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
- */
-trait DeliverySink {
-  def full:Boolean
-  def send(delivery:Delivery):Unit
-}
-
-/**
- * <p>
- * A delivery sink which is connected to a transport. It expects the caller's dispatch
- * queue to be the same as the transport's/
- * <p>
- *
- * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
- */
-class TransportDeliverySink(var transport:Transport) extends DeliverySink {
-  def full:Boolean = transport.isFull
-  def send(delivery:Delivery) = if( transport.isConnected ) { transport.oneway(delivery.message, delivery) }
-}
-
-/**
- * <p>
- * A delivery sink which buffers deliveries sent to it up to it's
- * maxSize settings after which it starts flow controlling the sender.
- * <p>
- *
- * <p>
- * It executes the eventHandler when it has buffered deliveries.  The event handler
- * should call receive to get the queued deliveries and then ack the delivery
- * once it has been processed.  The producer will now be resumed until
- * the ack occurs.
- * <p>
- *
- * <p>
- * This class should only be called from a single serial dispatch queue.
- * <p>
- *
- * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
- */
-class DeliveryBuffer(var maxSize:Int=1024*32) extends DeliverySink {
-
-  var deliveries = new LinkedList[Delivery]()
-  private var size = 0
-  var eventHandler: Runnable = null
-
-  def full = size >= maxSize
-
-  def drain = eventHandler.run
-
-  def receive = deliveries.poll
-
-  def isEmpty = deliveries.isEmpty
-
-  def send(delivery:Delivery):Unit = {
-    delivery.retain
-    size += delivery.size
-    deliveries.addLast(delivery)
-    if( deliveries.size == 1 ) {
-      drain
-    }
-  }
-
-  def ack(delivery:Delivery) = {
-    // When a message is delivered to the consumer, we release
-    // used capacity in the outbound queue, and can drain the inbound
-    // queue
-    val wasBlocking = full
-    size -= delivery.size
-    delivery.release
-    if( !isEmpty ) {
-      drain
-    }
-  }
-
-}
-
-/**
- * Implements a delivery sink which sends to a 'down stream' delivery sink. If the
- * down stream delivery sink is full, this sink buffers the overflow deliveries. So that the
- * down stream sink does not need to worry about overflow.
- *
- * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
- */
-class DeliveryOverflowBuffer(val delivery_buffer:DeliverySink) extends DeliverySink {
-
-  private var overflow = new LinkedList[Delivery]()
-
-  protected def drainOverflow:Unit = {
-    while( !overflow.isEmpty && !full ) {
-      val delivery = overflow.removeFirst
-      delivery.release
-      send_to_delivery_buffer(delivery)
-    }
-  }
-
-  def send(delivery:Delivery) = {
-    if( full ) {
-      // Deliveries in the overflow queue is remain acquired by us so that
-      // producer that sent it to us gets flow controlled.
-      delivery.retain
-      overflow.addLast(delivery)
-    } else {
-      send_to_delivery_buffer(delivery)
-    }
-  }
-
-  protected def send_to_delivery_buffer(value:Delivery) = {
-    var delivery = value.copy
-    delivery.setDisposer(^{
-      drainOverflow
-    })
-    delivery_buffer.send(delivery)
-    delivery.release
-  }
-
-  def full = delivery_buffer.full
-
-}
-
-/**
- * <p>
- * A DeliverySessionManager manages multiple credit based
- * transmission windows from multiple producers to a single consumer.
- * </p>
- *
- * <p>
- * Producers and consumers are typically executing on different threads and
- * the overhead of doing cross thread flow control is much higher than if
- * a credit window is used.  The credit window allows the producer to
- * send multiple messages to the consumer without needing to wait for
- * consumer events.  Only when the producer runs out of credit, does the
- * he start overflowing the producer.  This class makes heavy
- * use of Dispatch Source objects to coalesce cross thread events
- * like the sending messages or credits.
- * </p>
- *
- * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
- */
-class DeliverySessionManager(val targetSink:DeliverySink, val queue:DispatchQueue) extends BaseRetained {
-
-  var sessions = List[Session]()
-
-  var session_min_credits = 1024*4;
-  var session_credit_capacity = 1024*32
-  var session_max_credits = session_credit_capacity;
-
-  queue.retain
-  setDisposer(^{
-    source.release
-    queue.release
-  })
-
-  // use a event aggregating source to coalesce multiple events from the same thread.
-  // all the sessions send to the same source.
-  val source = createSource(new ListEventAggregator[Delivery](), queue)
-  source.setEventHandler(^{drain_source});
-  source.resume
-
-  def drain_source = {
-    val deliveries = source.getData
-    deliveries.foreach { delivery=>
-      targetSink.send(delivery)
-      delivery.release
-    }
-  }
-
-  /**
-   * tracks one producer to consumer session / credit window.
-   */
-  class Session(val producer_queue:DispatchQueue) extends DeliveryOverflowBuffer(targetSink)  {
-
-    // retain since the producer will be using this source to send messages
-    // to the consumer
-    source.retain
-
-    ///////////////////////////////////////////////////
-    // These members are used from the context of the
-    // producer serial dispatch queue
-    ///////////////////////////////////////////////////
-
-    // create a source to coalesce credit events back to the producer side...
-    val credit_adder = createSource(EventAggregators.INTEGER_ADD , producer_queue)
-    credit_adder.setEventHandler(^{
-      internal_credit(credit_adder.getData.intValue)
-    });
-    credit_adder.resume
-
-    private var credits = 0;
-
-    private var closed = false
-
-    def close = {
-      credit_adder.release
-      source.release
-      closed=true
-    }
-
-    override def full = credits <= 0
-
-    override def send(delivery: Delivery) = {
-      // retain the storage ref.. the target sink should
-      // release once it no longer needs it.
-      if( delivery.ref !=null ) {
-        delivery.ref.retain
-      }
-      super.send(delivery)
-    }
-
-    override protected def send_to_delivery_buffer(value:Delivery) = {
-      if( !closed ) {
-        var delivery = value.copy
-        credit_adder.retain
-        delivery.setDisposer(^{
-          // once the delivery is received by the consumer, event
-          // the producer the credit.
-          credit_adder.merge(delivery.size);
-          credit_adder.release
-        })
-        internal_credit(-delivery.size)
-        // use the source to send the consumer a delivery event.
-        source.merge(delivery)
-      }
-    }
-
-    def internal_credit(value:Int) = {
-      credits += value;
-      if( closed || credits <= 0 ) {
-        credits = 0
-      } else {
-        drainOverflow
-      }
-    }
-
-    ///////////////////////////////////////////////////
-    // These members are used from the context of the
-    // consumer serial dispatch queue
-    ///////////////////////////////////////////////////
-
-    private var _capacity = 0
-
-    def credit(value:Int) = ^{
-      internal_credit(value)
-    } >>: producer_queue
-
-    def capacity(value:Int) = {
-      val change = value - _capacity;
-      _capacity = value;
-      credit(change)
-    }
-
-  }
-
-  def open(producer_queue:DispatchQueue):DeliverySink = {
-    val session = createSession(producer_queue)
-    sessions = session :: sessions
-    session.capacity(session_max_credits)
-    session
-  }
-
-  def close(session:DeliverySink) = {
-    session.asInstanceOf[DeliverySessionManager#Session].close
-  }
-
-  protected def createSession(producer_queue:DispatchQueue) = new Session(producer_queue)
-}

Added: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala?rev=961112&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Queue.scala Wed Jul  7 03:59:04 2010
@@ -0,0 +1,340 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.broker
+
+import _root_.org.fusesource.hawtdispatch.ScalaDispatch._
+import org.fusesource.hawtdispatch.{DispatchQueue, BaseRetained}
+import collection.{SortedMap}
+import java.util.LinkedList
+
+/**
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+trait QueueLifecyleListener {
+
+    /**
+     * A destination has bean created
+     *
+     * @param queue
+     */
+    def onCreate(queue:Queue);
+
+    /**
+     * A destination has bean destroyed
+     *
+     * @param queue
+     */
+    def onDestroy(queue:Queue);
+
+}
+
+
+object Queue extends Log {
+  val maxOutboundSize = 1024*1204*5
+}
+
+/**
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+class Queue(val destination:Destination, val storeId:Long) extends BaseRetained with Route with DeliveryConsumer with DeliveryProducer with DispatchLogging {
+  override protected def log = Queue
+
+  override val dispatchQueue:DispatchQueue = createQueue("queue:"+destination);
+  dispatchQueue.setTargetQueue(getRandomThreadQueue)
+  dispatchQueue {
+    debug("created queue for: "+destination)
+  }
+
+  // sequence numbers.. used to track what's in the store.
+  var first_seq = -1L
+  var last_seq = -1L
+  var message_seq_counter=0L
+  def next_message_seq = {
+    val rc = message_seq_counter
+    message_seq_counter += 1
+    rc
+  }
+
+  var count = 0
+
+  val pending = new QueueSink[Delivery](Delivery) {
+    override def offer(delivery: Delivery) = {
+      val d = delivery.copy
+      d.ack = true
+      super.offer(d)
+    }
+  }
+  val session_manager = new SinkMux[Delivery](pending, dispatchQueue, Delivery)
+
+  pending.drainer = ^{ drain_delivery_buffer }
+
+  def drain_delivery_buffer: Unit = {
+    while (!readyConsumers.isEmpty && !pending.isEmpty) {
+      val cs = readyConsumers.removeFirst
+      val delivery = pending.poll
+      if( cs.session.offer(delivery) ) {
+        // consumer was not full.. keep him in the ready list
+        readyConsumers.addLast(cs)
+      } else {
+        // consumer full.
+        cs.ready = false
+        pending.unpoll(delivery)
+      }
+    }
+  }
+
+
+  // Use an event source to coalesce cross thread synchronization.
+  val ack_source = createSource(new ListEventAggregator[Delivery](), dispatchQueue)
+  ack_source.setEventHandler(^{drain_acks});
+  ack_source.resume
+  def drain_acks = {
+    ack_source.getData.foreach { ack =>
+      pending.ack(ack)
+    }
+  }
+  override def ack(ack:Delivery) = {
+    ack_source.merge(ack)
+  }
+
+  setDisposer(^{
+    dispatchQueue.release
+    session_manager.release
+  })
+
+  class ConsumerState(val session:Session) extends Runnable {
+    session.refiller = this
+    var bound=true
+    var ready=true
+
+    def run() = {
+      if( bound && !ready ) {
+        ready = true
+        readyConsumers.addLast(this)
+        drain_delivery_buffer
+      }
+    }
+  }
+
+  var allConsumers = Map[DeliveryConsumer,ConsumerState]()
+  val readyConsumers = new LinkedList[ConsumerState]()
+
+  def connected(consumers:List[DeliveryConsumer]) = bind(consumers)
+  def bind(consumers:List[DeliveryConsumer]) = retaining(consumers) {
+      for ( consumer <- consumers ) {
+        val cs = new ConsumerState(consumer.connect(Queue.this))
+        allConsumers += consumer->cs
+        readyConsumers.addLast(cs)
+      }
+      drain_delivery_buffer
+    } >>: dispatchQueue
+
+  def unbind(consumers:List[DeliveryConsumer]) = releasing(consumers) {
+      for ( consumer <- consumers ) {
+        allConsumers.get(consumer) match {
+          case Some(cs)=>
+            cs.bound = false
+            cs.session.close
+            allConsumers -= consumer
+            readyConsumers.remove(cs)
+          case None=>
+        }
+      }
+    } >>: dispatchQueue
+
+  def disconnected() = throw new RuntimeException("unsupported")
+
+  def collocate(value:DispatchQueue):Unit = {
+    if( value.getTargetQueue ne dispatchQueue.getTargetQueue ) {
+      println(dispatchQueue.getLabel+" co-locating with: "+value.getLabel);
+      this.dispatchQueue.setTargetQueue(value.getTargetQueue)
+    }
+  }
+
+  def connect(p:DeliveryProducer) = new Session {
+    retain
+
+    override def consumer = Queue.this
+    override def producer = p
+
+    val session = session_manager.open(producer.dispatchQueue)
+
+    def close = {
+      session_manager.close(session)
+      release
+    }
+
+    // Delegate all the flow control stuff to the session
+    def full = session.full
+    def offer(value:Delivery) = {
+      if( session.full ) {
+        false
+      } else {
+        if( value.ref !=null ) {
+          value.ref.retain
+        }
+        val rc = session.offer(value)
+        assert(rc, "session should accept since it was not full")
+        true
+      }
+    }
+    
+    def refiller = session.refiller
+    def refiller_=(value:Runnable) = { session.refiller=value }
+  }
+
+  def matches(message:Delivery) = { true }
+
+//  def open_session(producer_queue:DispatchQueue) = new ConsumerSession {
+//    val consumer = StompQueue.this
+//    val deliveryQueue = new DeliveryOverflowBuffer(delivery_buffer)
+//    retain
+//
+//    def deliver(delivery:Delivery) = using(delivery) {
+//      deliveryQueue.send(delivery)
+//    } >>: queue
+//
+//    def close = {
+//      release
+//    }
+//  }
+
+
+}
+
+class XQueue(val destination:Destination) {
+
+// TODO:
+//    private VirtualHost virtualHost;
+//
+//    Queue() {
+//        this.queue = queue;
+//    }
+//
+//    /*
+//     * (non-Javadoc)
+//     *
+//     * @see
+//     * org.apache.activemq.broker.DeliveryTarget#deliver(org.apache.activemq
+//     * .broker.MessageDelivery, org.apache.activemq.flow.ISourceController)
+//     */
+//    public void deliver(MessageDelivery message, ISourceController<?> source) {
+//        queue.add(message, source);
+//    }
+//
+//    public final void addSubscription(final Subscription<MessageDelivery> sub) {
+//        queue.addSubscription(sub);
+//    }
+//
+//    public boolean removeSubscription(final Subscription<MessageDelivery> sub) {
+//        return queue.removeSubscription(sub);
+//    }
+//
+//    public void start() throws Exception {
+//        queue.start();
+//    }
+//
+//    public void stop() throws Exception {
+//        if (queue != null) {
+//            queue.stop();
+//        }
+//    }
+//
+//    public void shutdown(Runnable onShutdown) throws Exception {
+//        if (queue != null) {
+//            queue.shutdown(onShutdown);
+//        }
+//    }
+//
+//    public boolean hasSelector() {
+//        return false;
+//    }
+//
+//    public boolean matches(MessageDelivery message) {
+//        return true;
+//    }
+//
+//    public VirtualHost getBroker() {
+//        return virtualHost;
+//    }
+//
+//    public void setVirtualHost(VirtualHost virtualHost) {
+//        this.virtualHost = virtualHost;
+//    }
+//
+//    public void setDestination(Destination destination) {
+//        this.destination = destination;
+//    }
+//
+//    public final Destination getDestination() {
+//        return destination;
+//    }
+//
+//    public boolean isDurable() {
+//        return true;
+//    }
+//
+//    public static class QueueSubscription implements BrokerSubscription {
+//        Subscription<MessageDelivery> subscription;
+//        final Queue queue;
+//
+//        public QueueSubscription(Queue queue) {
+//            this.queue = queue;
+//        }
+//
+//        /*
+//         * (non-Javadoc)
+//         *
+//         * @see
+//         * org.apache.activemq.broker.BrokerSubscription#connect(org.apache.
+//         * activemq.broker.protocol.ProtocolHandler.ConsumerContext)
+//         */
+//        public void connect(ConsumerContext subscription) throws UserAlreadyConnectedException {
+//            this.subscription = subscription;
+//            queue.addSubscription(subscription);
+//        }
+//
+//        /*
+//         * (non-Javadoc)
+//         *
+//         * @see
+//         * org.apache.activemq.broker.BrokerSubscription#disconnect(org.apache
+//         * .activemq.broker.protocol.ProtocolHandler.ConsumerContext)
+//         */
+//        public void disconnect(ConsumerContext context) {
+//            queue.removeSubscription(subscription);
+//        }
+//
+//        /* (non-Javadoc)
+//         * @see org.apache.activemq.broker.BrokerSubscription#getDestination()
+//         */
+//        public Destination getDestination() {
+//            return queue.getDestination();
+//        }
+//    }
+
+  // TODO:
+  def matches(message:Delivery) = false
+  def deliver(message:Delivery) = {
+    // TODO:
+  }
+
+  def getDestination() = destination
+
+  def shutdown = {}
+}

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala?rev=961112&r1=961111&r2=961112&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Router.scala Wed Jul  7 03:59:04 2010
@@ -181,8 +181,8 @@ class Router(val host:VirtualHost) exten
       }
     } >>: dispatchQueue
 
-  def connect(destination:Destination, routeQueue:DispatchQueue, producer:DeliveryProducer)(completed: (DeliveryProducerRoute)=>Unit) = {
-    val route = new DeliveryProducerRoute(destination, routeQueue, producer) {
+  def connect(destination:Destination, producer:DeliveryProducer)(completed: (DeliveryProducerRoute)=>Unit) = {
+    val route = new DeliveryProducerRoute(destination, producer) {
       override def on_connected = {
         completed(this);
       }
@@ -214,8 +214,8 @@ class Router(val host:VirtualHost) exten
  */
 trait Route extends Retained {
 
-  val destination:Destination
-  val dispatchQueue:DispatchQueue
+  def destination:Destination
+  def dispatchQueue:DispatchQueue
   val metric = new AtomicLong();
 
   def connected(targets:List[DeliveryConsumer]):Unit
@@ -228,9 +228,10 @@ trait Route extends Retained {
 /**
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-class DeliveryProducerRoute(val destination:Destination, override val dispatchQueue:DispatchQueue, val producer:DeliveryProducer) extends BaseRetained with Route with DispatchLogging {
+class DeliveryProducerRoute(val destination:Destination, val producer:DeliveryProducer) extends BaseRetained with Route with Sink[Delivery] with DispatchLogging {
 
   override protected def log = Router
+  override def dispatchQueue = producer.dispatchQueue
 
   // Retain the queue while we are retained.
   dispatchQueue.retain
@@ -238,7 +239,7 @@ class DeliveryProducerRoute(val destinat
     dispatchQueue.release
   })
 
-  var targets = List[DeliverySession]()
+  var targets = List[Session]()
 
   def connected(targets:List[DeliveryConsumer]) = retaining(targets) {
     internal_bind(targets)
@@ -252,7 +253,9 @@ class DeliveryProducerRoute(val destinat
   private def internal_bind(values:List[DeliveryConsumer]) = {
     values.foreach{ x=>
       debug("producer route attaching to conusmer.")
-      targets = x.connect(dispatchQueue) :: targets
+      val target = x.connect(producer);
+      target.refiller = drainer
+      targets ::= target
     }
   }
 
@@ -278,4 +281,50 @@ class DeliveryProducerRoute(val destinat
   protected def on_connected = {}
   protected def on_disconnected = {}
 
+  //
+  // Sink trait implementation.  This Sink overflows
+  // by 1 value.  It's only full when overflowed.  It overflows
+  // when one of the down stream sinks cannot accept the offered
+  // Dispatch.
+  //
+
+  var overflow:Delivery=null
+  var overflowSessions = List[Session]()
+  var refiller:Runnable=null
+
+  def full = overflow!=null
+
+  def offer(value:Delivery) = {
+    if( full ) {
+      false
+    } else {
+      targets.foreach { target=>
+        if( !target.offer(value) ) {
+          overflowSessions ::= target
+        }
+      }
+      if( overflowSessions!=Nil ) {
+        overflow = value
+      }
+      true
+    }
+  }
+
+  val drainer = ^{
+    if( overflow!=null ) {
+      val original = overflowSessions;
+      overflowSessions = Nil
+      original.foreach { target=>
+        if( !target.offer(overflow) ) {
+          overflowSessions ::= target
+        }
+      }
+      if( overflowSessions==Nil ) {
+        overflow = null
+        refiller.run
+      }
+    }
+  }
+
+
 }

Added: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala?rev=961112&view=auto
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala (added)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Sink.scala Wed Jul  7 03:59:04 2010
@@ -0,0 +1,345 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.apollo.broker
+
+import _root_.org.fusesource.hawtdispatch._
+import _root_.org.fusesource.hawtdispatch.ScalaDispatch._
+import org.apache.activemq.transport.Transport
+import java.util.{LinkedList}
+
+/**
+ * <p>
+ * Defines a simple trait to control the flow of data
+ * between callers and implementations of this trait.
+ * <p>
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+trait Sink[T] {
+
+  /**
+   * @return true if the sink is full
+   */
+  def full:Boolean
+
+  /**
+   * Attempts to add a value to the sink.  If the sink is full,
+   * this method will typically return false.  The caller should
+   * try to offer the value again once the refiller is exectuted.
+   *
+   * @return true if the value was added.
+   */
+  def offer(value:T):Boolean
+
+  /**
+   * Sets a refiller on the sink.  The refiller is executed
+   * when the sink is interested in receiving more deliveries.
+   */
+  var refiller:Runnable
+}
+
+/**
+ * <p>
+ * A delivery sink which is connected to a transport. It expects the caller's dispatch
+ * queue to be the same as the transport's/
+ * <p>
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+class TransportSink(val transport:Transport) extends Sink[AnyRef] {
+  def full:Boolean = transport.full
+  def offer(value:AnyRef) =  transport.offer(value)
+  var refiller:Runnable = null
+}
+
+/**
+ * Implements a delivery sink which buffers the overflow of deliveries that
+ * a 'down stream' sink cannot accept when it's full.
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+class OverflowSink[T](val downstream:Sink[T]) extends Sink[T] {
+
+  private var overflow = new LinkedList[T]()
+  var refiller: Runnable = null
+
+  downstream.refiller = ^{ drain }
+
+  protected def drain:Unit = {
+    while( overflowed ) {
+      val delivery = overflow.removeFirst
+      if( !downstream.offer(delivery) ) {
+        overflow.addFirst(delivery)
+        return
+      } else {
+        onDelivered(delivery)
+      }
+    }
+    // request a refill once the overflow is empty...
+    refiller.run
+  }
+
+  /**
+   * @return true always even when full since those messages just get stored in a
+   *         overflow list
+   */
+  def offer(value:T) = {
+    if( overflowed || !downstream.offer(value)) {
+      overflow.addLast(value)
+    } else {
+      onDelivered(value)
+    }
+    true
+  }
+
+  /**
+   * Called for each value what is passed on to the down stream sink.
+   */
+  protected def onDelivered(value:T) = {
+  }
+
+  def overflowed = !overflow.isEmpty
+
+  def full = overflowed || downstream.full
+}
+
+object MapSink {
+  def apply[X,Y](downstream:Sink[X])(func: Y=>X ) = {
+    new Sink[Y] {
+      def refiller = downstream.refiller
+      def refiller_=(value:Runnable) = downstream.refiller=value
+
+      def full = downstream.full
+      def offer(value:Y) = {
+        downstream.offer(func(value))
+      }
+    }
+  }
+}
+
+/**
+ *  <p>
+ * A SinkMux multiplexes access to a target sink so that multiple
+ * producers can send data to it concurrently.  The SinkMux creates
+ * a new session/sink for each connected producer.  The session
+ * uses credit based flow control to cut down the cross thread
+ * events issued.
+ * </p>
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+class SinkMux[T](val target:Sink[T], val queue:DispatchQueue, val sizer:Sizer[T]) extends BaseRetained {
+
+  var sessions = List[Session]()
+
+  var session_min_credits = 1024*4;
+  var session_credit_capacity = 1024*32
+  var session_max_credits = session_credit_capacity;
+
+  val overflow = new OverflowSink[(Session,T)](MapSink(target){_._2}) {
+
+    // Once a value leaves the overflow, then we can credit the
+    // session so that more messages can be accpted.
+    override protected def onDelivered(event:(Session,T)) = {
+      val session = event._1
+      val value = event._2
+      session.credit_adder.merge(sizer.size(value));
+      session.credit_adder.release
+    }
+  }
+  // As messages are delivered, and we credit the sessions,
+  // that triggers the sessions to refill the overflow.  No
+  // need to have a refiller action.
+  overflow.refiller = ^ { }
+
+  queue.retain
+  setDisposer(^{
+    source.release
+    queue.release
+  })
+
+  // use a event aggregating source to coalesce multiple events from the same thread.
+  // all the sessions send to the same source.
+  val source = createSource(new ListEventAggregator[(Session,T)](), queue)
+  source.setEventHandler(^{drain_source});
+  source.resume
+
+  def drain_source = {
+    source.getData.foreach { event =>
+      // overflow sinks can always accept more values.
+      overflow.offer(event)
+    }
+  }
+
+  /**
+   * tracks one producer to consumer session / credit window.
+   */
+  class Session(val producer_queue:DispatchQueue) extends Sink[T] {
+
+    // retain since the producer will be using this source to send messages
+    // to the consumer
+    source.retain
+
+    ///////////////////////////////////////////////////
+    // These members are used from the context of the
+    // producer serial dispatch queue
+    ///////////////////////////////////////////////////
+
+    // create a source to coalesce credit events back to the producer side...
+    val credit_adder = createSource(EventAggregators.INTEGER_ADD , producer_queue)
+    credit_adder.setEventHandler(^{
+      internal_credit(credit_adder.getData.intValue)
+    });
+    credit_adder.resume
+
+    private var credits = 0;
+    private var closed = false
+    var refiller:Runnable = null
+    var _full = false
+
+    def close = {
+      credit_adder.release
+      source.release
+      closed=true
+    }
+
+    override def full = _full
+
+    override def offer(value: T) = {
+      if( _full || closed ) {
+        false
+      } else {
+        credit_adder.retain
+        internal_credit(-sizer.size(value))
+        source.merge((this, value))
+        true
+      }
+    }
+
+    def internal_credit(value:Int) = {
+      credits += value;
+      if( closed || credits <= 0 ) {
+        _full = true
+      } else if( credits==session_max_credits ) {
+        // refill once we are empty.
+        if( _full ) {
+          _full  = false
+          refiller.run
+        }
+      }
+    }
+
+    ///////////////////////////////////////////////////
+    // These members are used from the context of the
+    // consumer serial dispatch queue
+    ///////////////////////////////////////////////////
+
+    private var _capacity = 0
+
+    def credit(value:Int) = ^{
+      internal_credit(value)
+    } >>: producer_queue
+
+    def capacity(value:Int) = {
+      val change = value - _capacity;
+      _capacity = value;
+      credit(change)
+    }
+
+  }
+
+  def open(producer_queue:DispatchQueue):Sink[T] = {
+    val session = createSession(producer_queue)
+    sessions = session :: sessions
+    session.capacity(session_max_credits)
+    session
+  }
+
+  def close(session:Sink[T]) = {
+    session.asInstanceOf[SinkMux[T]#Session].close
+  }
+
+  protected def createSession(producer_queue:DispatchQueue) = new Session(producer_queue)
+}
+
+/**
+ * A sizer can determine the size of other objects.
+ */
+trait Sizer[T] {
+  def size(value:T):Int
+}
+
+/**
+ * <p>
+ * A delivery sink which buffers deliveries sent to it up to it's
+ * maxSize settings after which it starts flow controlling the sender.
+ * <p>
+ *
+ * <p>
+ * It executes the drainer when it has queued values.  The drainer
+ * should call poll to get the queued values and then ack the values
+ * once they have been processed to allow additional values to be accepted.
+ * The refiller is executed once the the queue is drained.
+ * <p>
+ *
+ * <p>
+ * This class should only be called from a single serial dispatch queue.
+ * <p>
+ *
+ * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
+ */
+class QueueSink[T](val sizer:Sizer[T], var maxSize:Int=1024*32) extends Sink[T] {
+
+  var buffer = new LinkedList[T]()
+  private var size = 0
+
+  var drainer: Runnable = null
+  var refiller: Runnable = null
+
+  def full = size >= maxSize
+  def poll = buffer.poll
+  def unpoll(value:T) = buffer.addFirst(value)
+  def isEmpty = buffer.isEmpty
+
+  private def drain = drainer.run
+
+  def offer(value:T):Boolean = {
+    if( full ) {
+      false
+    } else {
+      size += sizer.size(value)
+      buffer.addLast(value)
+      if( buffer.size == 1 ) {
+        drain
+      }
+      true
+    }
+  }
+
+  def ack(delivery:T) = {
+    // When a message is delivered to the consumer, we release
+    // used capacity in the outbound queue, and can drain the inbound
+    // queue
+    val wasBlocking = full
+    size -= sizer.size(delivery)
+    if( !isEmpty ) {
+      drain
+    } else {
+      refiller.run
+    }
+  }
+
+}

Modified: activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala?rev=961112&r1=961111&r2=961112&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompFrame.scala Wed Jul  7 03:59:04 2010
@@ -20,7 +20,7 @@ import _root_.java.util.LinkedList
 import _root_.org.apache.activemq.filter.{Expression, MessageEvaluationContext}
 import _root_.org.fusesource.hawtbuf._
 import collection.mutable.ListBuffer
-import org.apache.activemq.apollo.broker.{Destination, BufferConversions, Message}
+import org.apache.activemq.apollo.broker.{Sizer, Destination, BufferConversions, Message}
 
 /**
  *
@@ -113,6 +113,10 @@ case class StompFrameMessage(frame:Stomp
   }
 }
 
+object StompFrame extends Sizer[StompFrame] {
+  def size(value:StompFrame) = value.size   
+}
+
 /**
  * Represents all the data in a STOMP frame.
  *

Modified: activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala?rev=961112&r1=961111&r2=961112&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala Wed Jul  7 03:59:04 2010
@@ -60,9 +60,7 @@ class StompProtocolHandler extends Proto
   protected def dispatchQueue:DispatchQueue = connection.dispatchQueue
 
   class SimpleConsumer(val destination:Destination) extends BaseRetained with DeliveryConsumer {
-
     val dispatchQueue = StompProtocolHandler.this.dispatchQueue
-    val session_manager = new DeliverySessionManager(outboundChannel, dispatchQueue)
 
     dispatchQueue.retain
     setDisposer(^{
@@ -70,27 +68,49 @@ class StompProtocolHandler extends Proto
       dispatchQueue.release
     })
 
-    def matches(message:Delivery) = true
-
-    def connect(producer_queue:DispatchQueue) = new DeliverySession {
-      val session = session_manager.open(producer_queue)
+    def matches(delivery:Delivery) = {
+      // TODO add selector support here...
+      delivery.message.isInstanceOf[StompFrameMessage]
+    }
 
-      val consumer = SimpleConsumer.this
+    def connect(p:DeliveryProducer) = new Session {
       retain
 
-      def deliver(delivery:Delivery) =  {
-//        info("Delivering to consumer session")
-        session.send(delivery)
-      }
+      def producer = p
+      def consumer = SimpleConsumer.this
+
+      val session = session_manager.open(producer.dispatchQueue)
 
       def close = {
         session_manager.close(session)
         release
       }
+
+      // Delegate all the flow control stuff to the session
+      def full = session.full
+      def offer(delivery:Delivery) = {
+        if( session.full ) {
+          false
+        } else {
+          if( delivery.ack ) {
+            producer.ack(delivery)
+          }
+          val frame = delivery.message.asInstanceOf[StompFrameMessage].frame
+          val rc = session.offer(frame)
+          assert(rc, "offer should be accepted since it was not full")
+          true
+        }
+      }
+      
+      def refiller = session.refiller
+      def refiller_=(value:Runnable) = { session.refiller=value }
+
     }
   }
 
-  var outboundChannel:TransportDeliverySink = null
+  var session_manager:SinkMux[StompFrame] = null
+  var connection_sink:Sink[StompFrame] = null
+
   var closed = false
   var consumer:SimpleConsumer = null
 
@@ -100,13 +120,10 @@ class StompProtocolHandler extends Proto
   private def queue = connection.dispatchQueue
 
   override def onTransportConnected() = {
-    outboundChannel = new TransportDeliverySink(connection.transport) {
-      override def send(delivery: Delivery) = {
-        if( transport.isConnected ) {
-          transport.oneway(delivery.message.asInstanceOf[StompFrameMessage].frame, delivery)
-        }
-      }
-    }
+    session_manager = new SinkMux[StompFrame]( MapSink(connection.transportSink){ x=>x }, dispatchQueue, StompFrame)
+    connection_sink = new OverflowSink(session_manager.open(dispatchQueue));
+    connection_sink.refiller = ^{}
+    
     connection.connector.broker.getDefaultVirtualHost(
       queue.wrap { (host)=>
         this.host=host
@@ -115,7 +132,6 @@ class StompProtocolHandler extends Proto
     )
   }
 
-
   override def onTransportDisconnected() = {
     if( !closed ) {
       closed=true;
@@ -132,7 +148,7 @@ class StompProtocolHandler extends Proto
   }
 
 
-  def onTransportCommand(command:Any) = {
+  override def onTransportCommand(command:Any) = {
     try {
       command match {
         case StompFrame(Commands.SEND, headers, content, _) =>
@@ -164,7 +180,7 @@ class StompProtocolHandler extends Proto
 
 
   def on_stomp_connect(headers:HeaderMap) = {
-    connection.transport.oneway(StompFrame(Responses.CONNECTED), null)
+    connection_sink.offer(StompFrame(Responses.CONNECTED))
   }
 
   def get(headers:HeaderMap, name:AsciiBuffer):Option[AsciiBuffer] = {
@@ -192,6 +208,7 @@ class StompProtocolHandler extends Proto
           }
 
           val producer = new DeliveryProducer() {
+            override def dispatchQueue = queue
             override def collocate(value:DispatchQueue):Unit = ^{
 //              TODO:
 //              if( value.getTargetQueue ne queue.getTargetQueue ) {
@@ -206,11 +223,14 @@ class StompProtocolHandler extends Proto
 
           // don't process frames until we are connected..
           connection.transport.suspendRead
-          host.router.connect(destiantion, queue, producer) {
+          host.router.connect(destiantion, producer) {
             (route) =>
               if( !connection.stopped ) {
                 connection.transport.resumeRead
                 producerRoute = route
+                producerRoute.refiller = ^{
+                  connection.transport.resumeRead
+                }
                 send_via_route(producerRoute, frame)
               }
           }
@@ -251,14 +271,14 @@ class StompProtocolHandler extends Proto
 //        delivery.ref = host.database.createMessageRecord(message.id, content, PROTOCOL)
       }
 
-      connection.transport.suspendRead
-      delivery.setDisposer(^{
-        connection.transport.resumeRead
-      })
-      route.targets.foreach(consumer=>{
-        consumer.deliver(delivery)
-      })
-      delivery.release;
+      // routes can allways accept at least 1 delivery...
+      assert( !route.full )
+      route.offer(delivery)
+      if( route.full ) {
+        // but once it gets full.. suspend, so that we get more stomp messages
+        // until it's not full anymore.
+        connection.transport.suspendRead
+      }
     } else {
       // info("Dropping message.  No consumers interested in message.")
     }
@@ -287,7 +307,7 @@ class StompProtocolHandler extends Proto
     if( !connection.stopped ) {
       info("Shutting connection down due to: "+msg)
       connection.transport.suspendRead
-      connection.transport.oneway(StompFrame(Responses.ERROR, Nil, ascii(msg)), null)
+      connection.transport.offer(StompFrame(Responses.ERROR, Nil, ascii(msg)), null)
       ^ {
         connection.stop()
       } >>: queue

Modified: activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompBrokerPerfTest.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompBrokerPerfTest.scala?rev=961112&r1=961111&r2=961112&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompBrokerPerfTest.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompBrokerPerfTest.scala Wed Jul  7 03:59:04 2010
@@ -33,45 +33,51 @@ import org.fusesource.hawtdispatch.BaseR
 class StompBrokerPerfTest extends BaseBrokerPerfSupport {
   println(getClass.getClassLoader.getResource("log4j.properties"));
 
-    override def createProducer() =  new StompRemoteProducer()
-    override def createConsumer() = new StompRemoteConsumer()
-    override def getRemoteWireFormat() = "stomp"
+  override def createProducer() = new StompRemoteProducer()
+
+  override def createConsumer() = new StompRemoteConsumer()
+
+  override def getRemoteWireFormat() = "stomp"
 
 }
 
 class StompRemoteConsumer extends RemoteConsumer {
+  var outboundSink: OverflowSink[StompFrame] = null
 
-    def setupSubscription() = {
-        val stompDestination = if( destination.getDomain() == Domain.QUEUE_DOMAIN ) {
-            ascii("/queue/"+destination.getName().toString());
-        } else {
-            ascii("/topic/"+destination.getName().toString());
-        }
+  def setupSubscription() = {
+    outboundSink = new OverflowSink[StompFrame](MapSink(transportSink){ x=>x })
+    outboundSink.refiller = ^{}
+
+    val stompDestination = if (destination.getDomain() == Domain.QUEUE_DOMAIN) {
+      ascii("/queue/" + destination.getName().toString());
+    } else {
+      ascii("/topic/" + destination.getName().toString());
+    }
 
-        var frame = StompFrame(Stomp.Commands.CONNECT);
-        transport.oneway(frame, null);
+    var frame = StompFrame(Stomp.Commands.CONNECT);
+    outboundSink.offer(frame);
 
-        var headers:List[(AsciiBuffer, AsciiBuffer)] = Nil
-        headers ::= (Stomp.Headers.Subscribe.DESTINATION, stompDestination)
-        headers ::= (Stomp.Headers.Subscribe.ID, ascii("stomp-sub-"+name))
-        headers ::= (Stomp.Headers.Subscribe.ACK_MODE, Stomp.Headers.Subscribe.AckModeValues.AUTO)
-
-        frame = StompFrame(Stomp.Commands.SUBSCRIBE, headers);
-        transport.oneway(frame, null);
-    }
-
-    def onTransportCommand(command:Object) = {
-      var frame = command.asInstanceOf[StompFrame]
-      frame match {
-        case StompFrame(Responses.CONNECTED, headers, _, _) =>
-        case StompFrame(Responses.MESSAGE, headers, content, _) =>
-          messageReceived();
-        case StompFrame(Responses.ERROR, headers, content, _) =>
-          onFailure(new Exception("Server reported an error: " + frame.content));
-        case _ =>
-          onFailure(new Exception("Unexpected stomp command: " + frame.action));
-      }
+    var headers: List[(AsciiBuffer, AsciiBuffer)] = Nil
+    headers ::= (Stomp.Headers.Subscribe.DESTINATION, stompDestination)
+    headers ::= (Stomp.Headers.Subscribe.ID, ascii("stomp-sub-" + name))
+    headers ::= (Stomp.Headers.Subscribe.ACK_MODE, Stomp.Headers.Subscribe.AckModeValues.AUTO)
+
+    frame = StompFrame(Stomp.Commands.SUBSCRIBE, headers);
+    outboundSink.offer(frame);
+  }
+
+  override def onTransportCommand(command: Object) = {
+    var frame = command.asInstanceOf[StompFrame]
+    frame match {
+      case StompFrame(Responses.CONNECTED, headers, _, _) =>
+      case StompFrame(Responses.MESSAGE, headers, content, _) =>
+        messageReceived();
+      case StompFrame(Responses.ERROR, headers, content, _) =>
+        onFailure(new Exception("Server reported an error: " + frame.content));
+      case _ =>
+        onFailure(new Exception("Unexpected stomp command: " + frame.action));
     }
+  }
 
   protected def messageReceived() {
     if (thinkTime > 0) {
@@ -90,60 +96,69 @@ class StompRemoteConsumer extends Remote
 }
 
 class StompRemoteProducer extends RemoteProducer {
+  var outboundSink: OverflowSink[StompFrame] = null
+  var stompDestination: AsciiBuffer = null
+  var frame:StompFrame = null
+
+  def send_next: Unit = {
+    var headers: List[(AsciiBuffer, AsciiBuffer)] = Nil
+    headers ::= (Stomp.Headers.Send.DESTINATION, stompDestination);
+    if (property != null) {
+      headers ::= (ascii(property), ascii(property));
+    }
+    //    var p = this.priority;
+    //    if (priorityMod > 0) {
+    //        p = if ((counter % priorityMod) == 0) { 0 } else { priority }
+    //    }
+
+    var content = ascii(createPayload());
+    frame = StompFrame(Stomp.Commands.SEND, headers, content)
+    drain()
+  }
 
-    var stompDestination:AsciiBuffer = null
-
-    def send_next:Unit = {
-      var headers: List[(AsciiBuffer, AsciiBuffer)] = Nil
-      headers ::= (Stomp.Headers.Send.DESTINATION, stompDestination);
-      if (property != null) {
-          headers ::= (ascii(property), ascii(property));
-      }
-//    var p = this.priority;
-//    if (priorityMod > 0) {
-//        p = if ((counter % priorityMod) == 0) { 0 } else { priority }
-//    }
-
-      var content = ascii(createPayload());
-      val frame = StompFrame(Stomp.Commands.SEND, headers, content)
-      val delivery = new BaseRetained()
-      delivery.setDisposer(^{
+  def drain() = {
+    if( frame!=null ) {
+      if( !outboundSink.full ) {
+        outboundSink.offer(frame)
+        frame = null
         rate.increment();
         val task = ^ {
-          if( !stopped ) {
+          if (!stopped) {
             send_next
           }
         }
-        if( thinkTime > 0 ) {
+        if (thinkTime > 0) {
           dispatchQueue.dispatchAfter(thinkTime, TimeUnit.MILLISECONDS, task)
         } else {
           dispatchQueue << task
         }
-      })
-      transport.oneway(frame, delivery)
-      delivery.release
+      }
     }
+  }
 
-    override def setupProducer() = {
-      if( destination.getDomain() == Domain.QUEUE_DOMAIN  ) {
-          stompDestination = ascii("/queue/"+destination.getName().toString());
-      } else {
-          stompDestination = ascii("/topic/"+destination.getName().toString());
-      }
-      transport.oneway(StompFrame(Stomp.Commands.CONNECT), null);
-      send_next
+  override def setupProducer() = {
+    outboundSink = new OverflowSink[StompFrame](MapSink(transportSink){ x=>x })
+    outboundSink.refiller = ^ { drain }
+
+    if (destination.getDomain() == Domain.QUEUE_DOMAIN) {
+      stompDestination = ascii("/queue/" + destination.getName().toString());
+    } else {
+      stompDestination = ascii("/topic/" + destination.getName().toString());
     }
+    outboundSink.offer(StompFrame(Stomp.Commands.CONNECT));
+    send_next
+  }
 
-    def onTransportCommand(command:Object) = {
-      var frame = command.asInstanceOf[StompFrame]
-      frame match {
-        case StompFrame(Responses.CONNECTED, headers, _, _) =>
-        case StompFrame(Responses.ERROR, headers, content, _) =>
-          onFailure(new Exception("Server reported an error: " + frame.content.utf8));
-        case _ =>
-          onFailure(new Exception("Unexpected stomp command: " + frame.action));
-      }
+  override def onTransportCommand(command: Object) = {
+    var frame = command.asInstanceOf[StompFrame]
+    frame match {
+      case StompFrame(Responses.CONNECTED, headers, _, _) =>
+      case StompFrame(Responses.ERROR, headers, content, _) =>
+        onFailure(new Exception("Server reported an error: " + frame.content.utf8));
+      case _ =>
+        onFailure(new Exception("Unexpected stomp command: " + frame.action));
     }
+  }
 
 }
 

Modified: activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java?rev=961112&r1=961111&r2=961112&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java Wed Jul  7 03:59:04 2010
@@ -176,11 +176,11 @@ public class TcpTransport extends BaseSe
 
     int bufferSize = 1024*64;
 
-    final LinkedList<OneWay> outbound = new LinkedList<OneWay>();
     DataByteArrayOutputStream next_outbound_buffer;
     ByteBuffer outbound_buffer;
     protected boolean useLocalHost = true;
     ByteBuffer readBuffer = ByteBuffer.allocate(bufferSize);
+    boolean full = false;
 
     private final Runnable CANCEL_HANDLER = new Runnable() {
         public void run() {
@@ -357,11 +357,11 @@ public class TcpTransport extends BaseSe
     }
 
 
-    public boolean isFull() {
-        return next_outbound_buffer.size() >= bufferSize>>2;
+    public boolean full() {
+        return full;
     }
 
-    public void oneway(Object command, Retained retained) {
+    public boolean offer(Object command) {
         assert Dispatch.getCurrentQueue() == dispatchQueue;
         try {
             if (!socketState.is(CONNECTED.class)) {
@@ -372,30 +372,31 @@ public class TcpTransport extends BaseSe
             }
         } catch (IOException e) {
             onTransportFailure(e);
-            return;
+            return false;
         }
 
-        boolean wasEmpty = next_outbound_buffer.size()==0;
-        if (retained!=null && isFull() ) {
-            // retaining blocks the sender it is released.
-            retained.retain();
-            outbound.add(new OneWay(command, retained));
+        if ( full ) {
+            return false;
         } else {
             try {
                 wireformat.marshal(command, next_outbound_buffer);
+                if( next_outbound_buffer.size() >= bufferSize>>2 ) {
+                    full  = true;
+                }
             } catch (IOException e) {
                 onTransportFailure(e);
-                return;
+                return false;
             }
             if ( outbound_buffer.remaining()==0 ) {
                 writeSource.resume();
             }
+            return true;
         }
 
     }
 
     /**
-     * @retruns true if the outbound has been drained of all objects and there are no in progress writes.
+     * @retruns true if there are no in progress writes.
      */
     private boolean drainOutbound() {
         assert Dispatch.getCurrentQueue() == dispatchQueue;
@@ -416,22 +417,15 @@ public class TcpTransport extends BaseSe
                         outbound_buffer = next_outbound_buffer.toBuffer().toByteBuffer();
                         next_outbound_buffer = new DataByteArrayOutputStream(prev_size);
                     } else {
-                        // marshall all the available frames..
-                        OneWay oneWay = outbound.poll();
-                        while (oneWay != null) {
-                            wireformat.marshal(oneWay.command, next_outbound_buffer);
-                            if (oneWay.retained != null) {
-                                oneWay.retained.release();
-                            }
-                            if ( isFull() ) {
-                                oneWay = null;
-                            } else {
-                                oneWay = outbound.poll();
+                        if( full ) {
+                            full = false;
+                            listener.onRefill();
+                            // If the listener did not have anything for us...
+                            if (next_outbound_buffer.size() == 0) {
+                                // the source is now drained...
+                                return true;
                             }
-                        }
-
-                        if (next_outbound_buffer.size() == 0) {
-                            // the source is now drained...
+                        } else {
                             return true;
                         }
                     }
@@ -442,8 +436,7 @@ public class TcpTransport extends BaseSe
             onTransportFailure(e);
             return true;
         }
-
-        return outbound.isEmpty() && outbound_buffer == null;
+        return outbound_buffer.remaining() == 0;
     }
 
     private void drainInbound() throws IOException {

Modified: activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/DefaultTransportListener.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/DefaultTransportListener.java?rev=961112&r1=961111&r2=961112&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/DefaultTransportListener.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/DefaultTransportListener.java Wed Jul  7 03:59:04 2010
@@ -32,6 +32,9 @@ public class DefaultTransportListener im
     public void onTransportCommand(Object command) {
     }
 
+    public void onRefill() {
+    }
+
     /**
      * An unrecoverable exception has occured on the transport
      * 

Modified: activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/Transport.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/Transport.java?rev=961112&r1=961111&r2=961112&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/Transport.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/Transport.java Wed Jul  7 03:59:04 2010
@@ -32,15 +32,15 @@ import org.fusesource.hawtdispatch.Retai
 public interface Transport extends Service {
 
 
-    boolean isFull();
+    boolean full();
 
     /**
-     * A one way asynchronous send.
+     * A one way asynchronous send of a command.  Only sent if the the transport is not full.
      * 
      * @param command
-     * @throws IOException
+     * @return true if the command was accepted.
      */
-    void oneway(Object command, Retained retained);
+    boolean offer(Object command);
 
     /**
      * Returns the current transport listener

Modified: activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/TransportFilter.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/TransportFilter.java?rev=961112&r1=961111&r2=961112&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/TransportFilter.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/TransportFilter.java Wed Jul  7 03:59:04 2010
@@ -109,17 +109,21 @@ public class TransportFilter implements 
         transportListener.onTransportCommand(command);
     }
 
+    public void onRefill() {
+        transportListener.onRefill();
+    }
+
 
     public String toString() {
         return next.toString();
     }
 
-    public void oneway(Object command, Retained retained) {
-        next.oneway(command, retained);
+    public boolean  offer(Object command) {
+        return next.offer(command);
     }
 
-    public boolean isFull() {
-        return next.isFull();
+    public boolean full() {
+        return next.full();
     }
 
     public void onTransportFailure(IOException error) {

Modified: activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/TransportListener.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/TransportListener.java?rev=961112&r1=961111&r2=961112&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/TransportListener.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/TransportListener.java Wed Jul  7 03:59:04 2010
@@ -30,6 +30,12 @@ public interface TransportListener {
      * @param command
      */
     void onTransportCommand(Object command);
+
+    /**
+     * transport can now accept more commands for transmission. 
+     */
+    void onRefill();
+
     /**
      * An unrecoverable exception has occured on the transport
      * @param error

Modified: activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransport.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransport.java?rev=961112&r1=961111&r2=961112&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransport.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransport.java Wed Jul  7 03:59:04 2010
@@ -154,39 +154,34 @@ public class PipeTransport implements Tr
         }
     }
 
-    final LinkedList<OneWay> inbound = new LinkedList<OneWay>();
     int outbound = 0;
     int maxOutbound = 100;
 
-    public boolean isFull() {
+    public boolean full() {
         return outbound >= maxOutbound;
     }
 
-    public void oneway(Object command, Retained retained) {
+    public boolean offer(Object command) {
         if( !connected ) {
-            throw new IllegalStateException("Not connected.");
+            return false;
         }
-        if( isFull() && retained!=null) {
-            retained.retain();
-            inbound.add(new OneWay(command, retained));
+        if( full() ) {
+            return false;
         } else {
-            transmit(command, null);
+            transmit(command);
+            return true;
         }
     }
 
     private void drainInbound() {
-        while( !isFull() && !inbound.isEmpty() ) {
-            OneWay oneWay = inbound.poll();
-            transmit(oneWay.command, oneWay.retained);
+        if( !full() ) {
+            listener.onRefill();
         }
     }
 
-    private void transmit(Object command, Retained retained) {
+    private void transmit(Object command) {
         outbound++;
         peer.dispatchSource.merge(command);
-        if( retained!=null ) {
-            retained.release();
-        }
     }
 
     public String getRemoteAddress() {