You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2010/07/07 05:45:18 UTC

svn commit: r961077 - in /activemq/sandbox/activemq-apollo-actor: activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/ activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/ activemq-stomp/src/test/scala/org/apache/activemq/apollo...

Author: chirino
Date: Wed Jul  7 03:45:17 2010
New Revision: 961077

URL: http://svn.apache.org/viewvc?rev=961077&view=rev
Log:
transport tweaks to get perf up

Removed:
    activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/CompletionCallback.java
Modified:
    activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
    activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala
    activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompWireFormat.scala
    activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompBrokerPerfTest.scala
    activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompLoadClient.scala
    activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java
    activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/Transport.java
    activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/TransportFilter.java
    activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransport.java

Modified: activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala?rev=961077&r1=961076&r2=961077&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-broker/src/main/scala/org/apache/activemq/apollo/broker/Delivery.scala Wed Jul  7 03:45:17 2010
@@ -22,6 +22,7 @@ import _root_.java.lang.{String}
 import _root_.org.apache.activemq.util.buffer.{Buffer, AsciiBuffer}
 import _root_.org.fusesource.hawtdispatch._
 import _root_.org.fusesource.hawtdispatch.ScalaDispatch._
+import org.apache.activemq.transport.Transport
 
 trait DeliveryProducer {
   def collocate(queue:DispatchQueue):Unit
@@ -384,11 +385,21 @@ case class Delivery (
 //    }
 //}
 
+trait DeliverySink {
+  def full:Boolean
+  def send(delivery:Delivery):Unit
+}
+
+class TransportDeliverySink(val transport:Transport) extends DeliverySink {
+  def full:Boolean = transport.isFull
+  def send(delivery:Delivery) = transport.oneway(delivery.message, delivery)
+}
+
 /**
  *
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
-class DeliveryBuffer(var maxSize:Int=1024*32) {
+class DeliveryBuffer(var maxSize:Int=1024*32) extends DeliverySink {
 
   var deliveries = new LinkedList[Delivery]()
   private var size = 0
@@ -425,7 +436,7 @@ class DeliveryBuffer(var maxSize:Int=102
 
 }
 
-class DeliveryOverflowBuffer(val delivery_buffer:DeliveryBuffer) {
+class DeliveryOverflowBuffer(val delivery_buffer:DeliverySink) extends DeliverySink {
 
   private var overflow = new LinkedList[Delivery]()
 
@@ -433,7 +444,7 @@ class DeliveryOverflowBuffer(val deliver
     while( !overflow.isEmpty && !full ) {
       val delivery = overflow.removeFirst
       delivery.release
-      send_to_delivery_queue(delivery)
+      send_to_delivery_buffer(delivery)
     }
   }
 
@@ -444,11 +455,11 @@ class DeliveryOverflowBuffer(val deliver
       delivery.retain
       overflow.addLast(delivery)
     } else {
-      send_to_delivery_queue(delivery)
+      send_to_delivery_buffer(delivery)
     }
   }
 
-  protected def send_to_delivery_queue(value:Delivery) = {
+  protected def send_to_delivery_buffer(value:Delivery) = {
     var delivery = Delivery(value)
     delivery.setDisposer(^{
       drainOverflow
@@ -461,7 +472,7 @@ class DeliveryOverflowBuffer(val deliver
 
 }
 
-class DeliverySessionManager(val delivery_buffer:DeliveryBuffer, val queue:DispatchQueue) extends BaseRetained {
+class DeliverySessionManager(val delivery_buffer:DeliverySink, val queue:DispatchQueue) extends BaseRetained {
 
   var sessions = List[SessionServer]()
 
@@ -524,7 +535,7 @@ class DeliverySessionManager(val deliver
 
       override def full = credits <= 0
 
-      override protected def send_to_delivery_queue(value:Delivery) = {
+      override protected def send_to_delivery_buffer(value:Delivery) = {
         var delivery = Delivery(value)
         delivery.setDisposer(^{
           // This is called from the server/consumer thread

Modified: activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala?rev=961077&r1=961076&r2=961077&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompProtocol.scala Wed Jul  7 03:45:17 2010
@@ -28,7 +28,6 @@ import AsciiBuffer._
 import Stomp._
 import BufferConversions._
 import StompFrameConstants._
-import org.apache.activemq.transport.CompletionCallback
 import java.io.IOException
 
 
@@ -90,31 +89,17 @@ class StompProtocolHandler extends Proto
     }
   }
 
-  val outboundChannel = new DeliveryBuffer
+  var outboundChannel:TransportDeliverySink = null
   var closed = false
   var consumer:SimpleConsumer = null
 
   var producerRoute:DeliveryProducerRoute=null
   var host:VirtualHost = null
 
-  outboundChannel.eventHandler = ^{
-    var delivery = outboundChannel.receive
-    while( delivery!=null ) {
-      connection.transport.oneway(delivery.message, new CompletionCallback() {
-        def onCompletion() = {
-          outboundChannel.ack(delivery)
-        }
-        def onFailure(e:Exception) = {
-          connection.onFailure(e)
-        }
-      });
-      delivery = outboundChannel.receive
-    }
-  }
-
   private def queue = connection.dispatchQueue
 
   override def onTransportConnected() = {
+    outboundChannel = new TransportDeliverySink(connection.transport)
     connection.broker.runtime.getDefaultVirtualHost(
       queue.wrap { (host)=>
         this.host=host
@@ -172,7 +157,7 @@ class StompProtocolHandler extends Proto
 
 
   def on_stomp_connect(headers:HeaderMap) = {
-    connection.transport.oneway(StompFrame(Responses.CONNECTED))
+    connection.transport.oneway(StompFrame(Responses.CONNECTED), null)
   }
 
   def get(headers:HeaderMap, name:AsciiBuffer):Option[AsciiBuffer] = {
@@ -267,7 +252,7 @@ class StompProtocolHandler extends Proto
   private def die(msg:String) = {
     info("Shutting connection down due to: "+msg)
     connection.transport.suspendRead
-    connection.transport.oneway(StompFrame(Responses.ERROR, Nil, ascii(msg)))
+    connection.transport.oneway(StompFrame(Responses.ERROR, Nil, ascii(msg)), null)
     ^ {
       connection.stop()
     } ->: queue

Modified: activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompWireFormat.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompWireFormat.scala?rev=961077&r1=961076&r2=961077&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompWireFormat.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/main/scala/org/apache/activemq/apollo/stomp/StompWireFormat.scala Wed Jul  7 03:45:17 2010
@@ -74,8 +74,19 @@ class StompWireFormat extends WireFormat
     ByteBuffer.wrap(Array(x));
   }
 
+
   def marshal(command:Any, os:DataOutput) = {
+    marshal(command.asInstanceOf[StompFrame], os)
+  }
+
+  def marshal(command:Any):Buffer= {
     val frame = command.asInstanceOf[StompFrame]
+    val os = new DataByteArrayOutputStream(frame.size);
+    marshal(frame, os)
+    os.toBuffer
+  }
+
+  def marshal(frame:StompFrame, os:DataOutput) = {
     frame.action.writeTo(os)
     os.write(NEWLINE)
 
@@ -102,15 +113,6 @@ class StompWireFormat extends WireFormat
     END_OF_FRAME_BUFFER.writeTo(os)
   }
 
-  def marshal(command:Any):Buffer= {
-    val frame = command.asInstanceOf[StompFrame]
-    // make a little bigger since size can be an estimate and we want to avoid
-    // a capacity re-size.
-    val os = new DataByteArrayOutputStream(frame.size + 100);
-    marshal(frame, os)
-    os.toBuffer
-  }
-
   def unmarshal(packet:Buffer) = {
     throw new UnsupportedOperationException
   }

Modified: activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompBrokerPerfTest.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompBrokerPerfTest.scala?rev=961077&r1=961076&r2=961077&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompBrokerPerfTest.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompBrokerPerfTest.scala Wed Jul  7 03:45:17 2010
@@ -21,7 +21,6 @@ import _root_.org.apache.activemq.apollo
 import _root_.org.apache.activemq.apollo.broker.perf._
 import _root_.org.apache.activemq.apollo.stomp._
 
-import _root_.org.apache.activemq.transport.CompletionCallback
 import _root_.org.apache.activemq.util.buffer._
 import collection.mutable.{ListBuffer, HashMap}
 
@@ -29,6 +28,7 @@ import AsciiBuffer._
 import Stomp._
 import _root_.org.apache.activemq.apollo.stomp.StompFrame
 import _root_.org.fusesource.hawtdispatch.ScalaDispatch._
+import org.fusesource.hawtdispatch.BaseRetained
 
 object StompBrokerPerfTest {
   def main(args:Array[String]) = {
@@ -55,7 +55,7 @@ class StompRemoteConsumer extends Remote
         }
 
         var frame = StompFrame(Stomp.Commands.CONNECT);
-        transport.oneway(frame);
+        transport.oneway(frame, null);
 
         var headers:List[(AsciiBuffer, AsciiBuffer)] = Nil
         headers ::= (Stomp.Headers.Subscribe.DESTINATION, stompDestination)
@@ -63,7 +63,7 @@ class StompRemoteConsumer extends Remote
         headers ::= (Stomp.Headers.Subscribe.ACK_MODE, Stomp.Headers.Subscribe.AckModeValues.AUTO)
 
         frame = StompFrame(Stomp.Commands.SUBSCRIBE, headers);
-        transport.oneway(frame);
+        transport.oneway(frame, null);
     }
 
     def onTransportCommand(command:Object) = {
@@ -99,36 +99,35 @@ class StompRemoteProducer extends Remote
 
     var stompDestination:AsciiBuffer = null
 
-    val send_next:CompletionCallback = new CompletionCallback() {
-      def onCompletion() = {
+    def send_next:Unit = {
+      var headers: List[(AsciiBuffer, AsciiBuffer)] = Nil
+      headers ::= (Stomp.Headers.Send.DESTINATION, stompDestination);
+      if (property != null) {
+          headers ::= (ascii(property), ascii(property));
+      }
+//    var p = this.priority;
+//    if (priorityMod > 0) {
+//        p = if ((counter % priorityMod) == 0) { 0 } else { priority }
+//    }
+
+      var content = ascii(createPayload());
+      val frame = StompFrame(Stomp.Commands.SEND, headers, content)
+      val delivery = new BaseRetained()
+      delivery.setDisposer(^{
         rate.increment();
         val task = ^ {
           if( !stopping ) {
-
-            var headers: List[(AsciiBuffer, AsciiBuffer)] = Nil
-            headers ::= (Stomp.Headers.Send.DESTINATION, stompDestination);
-            if (property != null) {
-                headers ::= (ascii(property), ascii(property));
-            }
-//          var p = this.priority;
-//          if (priorityMod > 0) {
-//              p = if ((counter % priorityMod) == 0) { 0 } else { priority }
-//          }
-
-            var content = ascii(createPayload());
-            transport.oneway(StompFrame(Stomp.Commands.SEND, headers, content), send_next)
+            send_next
           }
-        } 
+        }
         if( thinkTime > 0 ) {
           dispatchQueue.dispatchAfter(thinkTime, TimeUnit.MILLISECONDS, task)
         } else {
           dispatchQueue << task
         }
-      }
-      def onFailure(error:Exception) = {
-        println("stopping due to: "+error);
-        stop
-      }
+      })
+      transport.oneway(frame, delivery)
+      delivery.release
     }
 
     override def setupProducer() = {
@@ -137,7 +136,8 @@ class StompRemoteProducer extends Remote
       } else {
           stompDestination = ascii("/topic/"+destination.getName().toString());
       }
-      transport.oneway(StompFrame(Stomp.Commands.CONNECT), send_next);
+      transport.oneway(StompFrame(Stomp.Commands.CONNECT), null);
+
     }
 
     def onTransportCommand(command:Object) = {

Modified: activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompLoadClient.scala
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompLoadClient.scala?rev=961077&r1=961076&r2=961077&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompLoadClient.scala (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-stomp/src/test/scala/org/apache/activemq/apollo/stomp/perf/StompLoadClient.scala Wed Jul  7 03:45:17 2010
@@ -36,7 +36,7 @@ object StompLoadClient {
   import StompLoadClient._
   implicit def toAsciiBuffer(value: String) = new AsciiBuffer(value)
 
-  var producerSleep = 1000*1000000;
+  var producerSleep = 0;
   var consumerSleep = 0;
   var producers = 1;
   var consumers = 1;

Modified: activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java?rev=961077&r1=961076&r2=961077&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-tcp/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java Wed Jul  7 03:45:17 2010
@@ -16,7 +16,6 @@
  */
 package org.apache.activemq.transport.tcp;
 
-import org.apache.activemq.transport.CompletionCallback;
 import org.apache.activemq.transport.Transport;
 import org.apache.activemq.transport.TransportListener;
 import org.apache.activemq.util.buffer.Buffer;
@@ -25,6 +24,7 @@ import org.apache.activemq.wireformat.Wi
 import org.fusesource.hawtdispatch.Dispatch;
 import org.fusesource.hawtdispatch.DispatchQueue;
 import org.fusesource.hawtdispatch.DispatchSource;
+import org.fusesource.hawtdispatch.Retained;
 
 import java.io.EOFException;
 import java.io.IOException;
@@ -32,6 +32,7 @@ import java.net.*;
 import java.nio.ByteBuffer;
 import java.nio.channels.SelectionKey;
 import java.nio.channels.SocketChannel;
+import java.util.ArrayList;
 import java.util.LinkedList;
 import java.util.Map;
 
@@ -40,7 +41,7 @@ import static org.apache.activemq.transp
 
 /**
  * An implementation of the {@link Transport} interface using raw tcp/ip
- * 
+ *
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
  */
 public class TcpTransport implements Transport {
@@ -75,20 +76,21 @@ public class TcpTransport implements Tra
     private DispatchSource writeSource;
 
     final LinkedList<OneWay> outbound = new LinkedList<OneWay>();
-    int maxOutbound = 1024*32;
+    int outboundSize = 0;
+    int maxOutbound = 1024 * 32;
     ByteBuffer outbound_frame;
     protected boolean useLocalHost = true;
 
-    int READ_BUFFFER_SIZE = 1024*32;
-    ByteBuffer readBuffer = ByteBuffer.allocate(1024*32);
+    int READ_BUFFFER_SIZE = 1024 * 32;
+    ByteBuffer readBuffer = ByteBuffer.allocate(1024 * 32);
 
 
     static final class OneWay {
         final Buffer buffer;
-        final CompletionCallback callback;
+        final Retained retained;
 
-        public OneWay(Buffer buffer, CompletionCallback callback) {
-            this.callback = callback;
+        public OneWay(Buffer buffer, Retained retained) {
+            this.retained = retained;
             this.buffer = buffer;
         }
     }
@@ -111,11 +113,11 @@ public class TcpTransport implements Tra
     }
 
     public void setDispatchQueue(DispatchQueue queue) {
-        if( dispatchQueue!=null ) {
+        if (dispatchQueue != null) {
             dispatchQueue.release();
         }
         this.dispatchQueue = queue;
-        if( dispatchQueue!=null ) {
+        if (dispatchQueue != null) {
             dispatchQueue.retain();
         }
     }
@@ -127,18 +129,18 @@ public class TcpTransport implements Tra
         if (listener == null) {
             throw new IllegalArgumentException("listener is not set");
         }
-        if( transportState!=CREATED ) {
+        if (transportState != CREATED) {
             throw new IllegalStateException("can only be started from the created stae");
         }
-        transportState=RUNNING;
-        
+        transportState = RUNNING;
+
         unmarshalSession = wireformat.createUnmarshalSession();
 
-        if( socketState == CONNECTING ) {
+        if (socketState == CONNECTING) {
             channel = SocketChannel.open();
         }
         channel.configureBlocking(false);
-        if( socketState == CONNECTING ) {
+        if (socketState == CONNECTING) {
 
             if (localLocation != null) {
                 InetSocketAddress localAddress = new InetSocketAddress(InetAddress.getByName(localLocation.getHost()), localLocation.getPort());
@@ -152,7 +154,7 @@ public class TcpTransport implements Tra
             final DispatchSource connectSource = Dispatch.createSource(channel, SelectionKey.OP_CONNECT, dispatchQueue);
             connectSource.setEventHandler(new Runnable() {
                 public void run() {
-                    if( transportState==RUNNING ) {
+                    if (transportState == RUNNING) {
                         try {
                             socketState = CONNECTED;
                             channel.finishConnect();
@@ -190,7 +192,7 @@ public class TcpTransport implements Tra
         }
 
         readSource = Dispatch.createSource(channel, SelectionKey.OP_READ, dispatchQueue);
-        readSource.setEventHandler(new Runnable(){
+        readSource.setEventHandler(new Runnable() {
             public void run() {
                 try {
                     drainInbound();
@@ -201,12 +203,12 @@ public class TcpTransport implements Tra
         });
 
         writeSource = Dispatch.createSource(channel, SelectionKey.OP_WRITE, dispatchQueue);
-        writeSource.setEventHandler(new Runnable(){
+        writeSource.setEventHandler(new Runnable() {
             public void run() {
-                if( transportState==RUNNING ) {
+                if (transportState == RUNNING) {
                     // once the outbound is drained.. we can suspend getting
                     // write events.
-                    if( drainOutbound() ) {
+                    if (drainOutbound()) {
                         writeSource.suspend();
                     }
                 }
@@ -219,33 +221,31 @@ public class TcpTransport implements Tra
 
 
     public void stop() throws Exception {
-        if( readSource!=null ) {
+        if (readSource != null) {
             readSource.release();
             readSource = null;
         }
-        if( writeSource!=null ) {
+        if (writeSource != null) {
             writeSource.release();
             writeSource = null;
         }
         setDispatchQueue(null);
-        transportState=DISPOSED;
+        transportState = DISPOSED;
     }
 
-    @Deprecated
-    public void oneway(Object command) {
-        oneway(command, null);
+
+    public boolean isFull() {
+        return outboundSize >= maxOutbound;
     }
 
-    public void oneway(Object command, CompletionCallback callback) {
+    public void oneway(Object command, Retained retained) {
         assert Dispatch.getCurrentQueue() == dispatchQueue;
         try {
-            if( socketState != CONNECTED ) {
-                throw new IllegalStateException("Not connected.");
-            }
-        } catch (IllegalStateException e) {
-            if( callback!=null ) {
-                callback.onFailure(e);
+            if (socketState != CONNECTED) {
+                throw new IOException("Not connected.");
             }
+        } catch (IOException e) {
+            listener.onTransportFailure(e);
         }
 
         // Marshall the command.
@@ -253,79 +253,95 @@ public class TcpTransport implements Tra
         try {
             buffer = wireformat.marshal(command);
         } catch (IOException e) {
-            callback.onFailure(e);
+            listener.onTransportFailure(e);
             return;
         }
 
-        outbound.add(new OneWay(buffer, callback));
+        OneWay oneway;
+        if (retained!=null && isFull() ) {
+            // retaining blocks the sender it is released.
+            retained.retain();
+            oneway = new OneWay(buffer, retained);
+        } else {
+            oneway = new OneWay(buffer, null);
+        }
+        outbound.add(oneway);
+        outboundSize += buffer.length;
 
         // wait for write ready events if this write
         // cannot be drained.
-        if( outbound.size()==1 && !drainOutbound() ) {
+        if (outbound.size() == 1 && !drainOutbound()) {
             writeSource.resume();
         }
     }
 
     /**
-    * @retruns true if the outbound has been drained of all objects and there are no in progress writes.
-    */
+     * @retruns true if the outbound has been drained of all objects and there are no in progress writes.
+     */
     private boolean drainOutbound() {
         try {
-            
-            while(socketState == CONNECTED) {
-                
-              // if we have a pending write that is being sent over the socket...
-              if( outbound_frame!=null ) {
-
-                channel.write(outbound_frame);
-                if( outbound_frame.remaining() != 0 ) {
-                  return false;
-                } else {
-                  outbound_frame = null;
-                }
 
-              } else {
+            while (socketState == CONNECTED) {
 
-                // marshall all the available frames..
-                ByteArrayOutputStream buffer = new ByteArrayOutputStream(maxOutbound << 2);
-                OneWay oneWay = outbound.poll();
-
-                while( oneWay!=null) {
-                    buffer.write(oneWay.buffer);
-                    if( oneWay.callback!=null ) {
-                        oneWay.callback.onCompletion();
-                    }
-                    if( buffer.size() < maxOutbound ) {
-                        oneWay = outbound.poll();
+                // if we have a pending write that is being sent over the socket...
+                if (outbound_frame != null) {
+
+                    channel.write(outbound_frame);
+                    if (outbound_frame.remaining() != 0) {
+                        return false;
                     } else {
-                        oneWay = null;
+                        outbound_frame = null;
                     }
-                }
-
 
-                if( buffer.size()==0 ) {
-                  // the source is now drained...
-                  return true;
                 } else {
-                  outbound_frame = buffer.toBuffer().toByteBuffer();
+
+                    // marshall all the available frames..
+                    OneWay oneWay = outbound.poll();
+
+                    int size = 0;
+                    ArrayList<Buffer> buffers = new ArrayList<Buffer>(outbound.size());
+                    while (oneWay != null) {
+                        size+=oneWay.buffer.length;
+                        buffers.add(oneWay.buffer);
+                        if (oneWay.retained != null) {
+                            oneWay.retained.release();
+                        }
+                        if (size < maxOutbound) {
+                            oneWay = outbound.poll();
+                        } else {
+                            oneWay = null;
+                        }
+                    }
+
+                    if (size == 0) {
+                        // the source is now drained...
+                        return true;
+                    } else {
+                        // Make the write just one big buffer.
+                        outboundSize -= size;
+                        ByteArrayOutputStream buffer = new ByteArrayOutputStream(size);
+                        for (Buffer b : buffers) {
+                            buffer.write(b);
+                        }
+                        outbound_frame = buffer.toBuffer().toByteBuffer();
+                    }
                 }
-              }
 
             }
 
         } catch (IOException e) {
             listener.onTransportFailure(e);
         }
-        
-        return outbound.isEmpty() && outbound_frame==null;
+
+        return outbound.isEmpty() && outbound_frame == null;
     }
 
 
     private void drainInbound() throws IOException {
-        if( transportState==DISPOSED || readSource.isSuspended() ) {
+        if (transportState == DISPOSED || readSource.isSuspended()) {
             return;
         }
-        while( true ) {
+        while (true) {
 
             // do we need to read in more data???
             if (unmarshalSession.getEndPos() == readBuffer.position()) {
@@ -359,12 +375,12 @@ public class TcpTransport implements Tra
                 }
             }
 
-            Object command=unmarshalSession.unmarshal(readBuffer);
-            if( command!=null ) {
+            Object command = unmarshalSession.unmarshal(readBuffer);
+            if (command != null) {
                 listener.onTransportCommand(command);
 
                 // the transport may be suspended after processing a command.
-                if( transportState==DISPOSED || readSource.isSuspended() ) {
+                if (transportState == DISPOSED || readSource.isSuspended()) {
                     return;
                 }
             }
@@ -391,14 +407,15 @@ public class TcpTransport implements Tra
     public void resumeRead() {
         readSource.resume();
     }
-    
-    public void reconnect(URI uri, CompletionCallback callback) {
+
+    public void reconnect(URI uri) {
         throw new UnsupportedOperationException();
     }
 
     public TransportListener getTransportListener() {
         return listener;
     }
+
     public void setTransportListener(TransportListener listener) {
         this.listener = listener;
     }
@@ -406,6 +423,7 @@ public class TcpTransport implements Tra
     public WireFormat getWireformat() {
         return wireformat;
     }
+
     public void setWireformat(WireFormat wireformat) {
         this.wireformat = wireformat;
     }
@@ -417,6 +435,7 @@ public class TcpTransport implements Tra
     public boolean isDisposed() {
         return transportState == DISPOSED;
     }
+
     public boolean isFaultTolerant() {
         return false;
     }
@@ -677,6 +696,7 @@ public class TcpTransport implements Tra
 //        this.minmumWireFormatVersion = minmumWireFormatVersion;
 //    }
 //
+
     public boolean isUseLocalHost() {
         return useLocalHost;
     }

Modified: activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/Transport.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/Transport.java?rev=961077&r1=961076&r2=961077&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/Transport.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/Transport.java Wed Jul  7 03:45:17 2010
@@ -22,6 +22,7 @@ import java.net.URI;
 import org.apache.activemq.Service;
 import org.apache.activemq.wireformat.WireFormat;
 import org.fusesource.hawtdispatch.DispatchQueue;
+import org.fusesource.hawtdispatch.Retained;
 
 /**
  * Represents an abstract connection.  It can be a client side or server side connection.
@@ -30,18 +31,16 @@ import org.fusesource.hawtdispatch.Dispa
  */
 public interface Transport extends Service {
 
-    @Deprecated
-    void oneway(Object command);
+
+    boolean isFull();
 
     /**
-     * A one way asynchronous send.  Once the command is transmitted the callback
-     * is invoked.
+     * A one way asynchronous send.
      * 
      * @param command
-     * @param callback
      * @throws IOException
      */
-    void oneway(Object command, CompletionCallback callback);
+    void oneway(Object command, Retained retained);
 
     /**
      * Returns the current transport listener
@@ -121,6 +120,6 @@ public interface Transport extends Servi
      * @param uri
      * @throws IOException on failure of if not supported
      */
-    void reconnect(URI uri, CompletionCallback callback);
+    void reconnect(URI uri);
 
 }

Modified: activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/TransportFilter.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/TransportFilter.java?rev=961077&r1=961076&r2=961077&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/TransportFilter.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/TransportFilter.java Wed Jul  7 03:45:17 2010
@@ -21,6 +21,7 @@ import java.net.URI;
 
 import org.apache.activemq.wireformat.WireFormat;
 import org.fusesource.hawtdispatch.DispatchQueue;
+import org.fusesource.hawtdispatch.Retained;
 
 /**
  * @version $Revision: 1.5 $
@@ -105,16 +106,14 @@ public class TransportFilter implements 
         return next.toString();
     }
 
-    @Deprecated
-    public void oneway(Object command) {
-        oneway(command, null);
+    public void oneway(Object command, Retained retained) {
+        next.oneway(command, retained);
     }
 
-    public void oneway(Object command, CompletionCallback callback) {
-        next.oneway(command, callback);
+    public boolean isFull() {
+        return next.isFull();
     }
 
-
     public void onTransportFailure(IOException error) {
         transportListener.onTransportFailure(error);
     }
@@ -154,8 +153,8 @@ public class TransportFilter implements 
         return next.isConnected();
     }
 
-    public void reconnect(URI uri, CompletionCallback callback) {
-        next.reconnect(uri, callback);
+    public void reconnect(URI uri) {
+        next.reconnect(uri);
     }
 
     public WireFormat getWireformat() {

Modified: activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransport.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransport.java?rev=961077&r1=961076&r2=961077&view=diff
==============================================================================
--- activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransport.java (original)
+++ activemq/sandbox/activemq-apollo-actor/activemq-transport/src/main/java/org/apache/activemq/transport/pipe/PipeTransport.java Wed Jul  7 03:45:17 2010
@@ -16,15 +16,11 @@
  */
 package org.apache.activemq.transport.pipe;
 
-import org.apache.activemq.transport.CompletionCallback;
 import org.apache.activemq.transport.Transport;
 import org.apache.activemq.transport.TransportListener;
 import org.apache.activemq.util.buffer.Buffer;
 import org.apache.activemq.wireformat.WireFormat;
-import org.fusesource.hawtdispatch.CustomDispatchSource;
-import org.fusesource.hawtdispatch.Dispatch;
-import org.fusesource.hawtdispatch.DispatchQueue;
-import org.fusesource.hawtdispatch.EventAggregators;
+import org.fusesource.hawtdispatch.*;
 
 import java.io.EOFException;
 import java.io.IOException;
@@ -139,11 +135,11 @@ public class PipeTransport implements Tr
 
     static final class OneWay {
         final Object command;
-        final CompletionCallback callback;
+        final Retained retained;
 
-        public OneWay(Object command, CompletionCallback callback) {
-            this.callback = callback;
+        public OneWay(Object command, Retained retained) {
             this.command = command;
+            this.retained = retained;
         }
     }
 
@@ -151,34 +147,34 @@ public class PipeTransport implements Tr
     int outbound = 0;
     int maxOutbound = 100;
 
-    @Deprecated
-    public void oneway(Object command) {
-        oneway(command, null);
+    public boolean isFull() {
+        return outbound >= maxOutbound;
     }
 
-    public void oneway(Object command, CompletionCallback callback) {
+    public void oneway(Object command, Retained retained) {
         if( !connected ) {
             throw new IllegalStateException("Not connected.");
         }
-        if( outbound < maxOutbound ) {
-            transmit(command, callback);
+        if( isFull() && retained!=null) {
+            retained.retain();
+            inbound.add(new OneWay(command, retained));
         } else {
-            inbound.add(new OneWay(command, callback));
+            transmit(command, null);
         }
     }
 
     private void drainInbound() {
-        while( outbound < maxOutbound && !inbound.isEmpty() ) {
+        while( !isFull() && !inbound.isEmpty() ) {
             OneWay oneWay = inbound.poll();
-            transmit(oneWay.command, oneWay.callback);
+            transmit(oneWay.command, oneWay.retained);
         }
     }
 
-    private void transmit(Object command, CompletionCallback callback) {
+    private void transmit(Object command, Retained retained) {
         outbound++;
         peer.dispatchSource.merge(command);
-        if( callback!=null ) {
-            callback.onCompletion();
+        if( retained!=null ) {
+            retained.release();
         }
     }
 
@@ -200,7 +196,7 @@ public class PipeTransport implements Tr
     public void resumeRead() {
         dispatchSource.resume();
     }
-    public void reconnect(URI uri, CompletionCallback callback) {
+    public void reconnect(URI uri) {
         throw new UnsupportedOperationException();
     }