You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2008/08/05 21:33:12 UTC

svn commit: r682887 [1/2] - in /incubator/qpid/trunk/qpid/java: client/example/src/main/java/org/apache/qpid/example/amqpexample/direct/ client/example/src/main/java/org/apache/qpid/example/amqpexample/fanout/ client/example/src/main/java/org/apache/qp...

Author: rhs
Date: Tue Aug  5 12:33:11 2008
New Revision: 682887

URL: http://svn.apache.org/viewvc?rev=682887&view=rev
Log:
Profiling driven changes:

 - made AMQShortString cache the toString() value

 - added static initializer to IoTransport to disable use of pooled
   byte buffers

 - modified IoSender to permit buffering

 - removed OutputHandler and eliminated intermediate Frame generation
   between Disassembler and Sender<ByteBuffer> (IoSender)

 - made Disassembler take advantage of IoSender's buffering

 - removed Header and Data as distinct protocol events, added Header
   and Body members to MessageTransfer

 - modified Assembler and Disassembler to decode/encode Header and
   Data directly to/from MessageTransfer

 - modified Disassembler to only write data if encoding of headers is
   successful

 - added Strings.toUTF8(String) -> byte[] to do proper UTF-8 encoding
   that is also fast for 7-bit ascii

 - modified JMSTextMessage to use the Strings.toUTF8

 - modified QpidBench to only generate 7-bit ascii when using
   TextMessage

Added:
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/util/Strings.java   (with props)
Removed:
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/impl/LargeMsgDemoClient.java
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Data.java
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/OutputHandler.java
Modified:
    incubator/qpid/trunk/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/direct/DirectProducer.java
    incubator/qpid/trunk/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/fanout/FannoutProducer.java
    incubator/qpid/trunk/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/pubsub/TopicPublisher.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/Session.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/impl/ClientSession.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/impl/ClientSessionDelegate.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/impl/DemoClient.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/interop/BasicInteropTest.java
    incubator/qpid/trunk/qpid/java/common/Composite.tpl
    incubator/qpid/trunk/qpid/java/common/Invoker.tpl
    incubator/qpid/trunk/qpid/java/common/genutil.py
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/ToyBroker.java
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/ToyClient.java
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/ToyExchange.java
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Channel.java
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Echo.java
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Header.java
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Method.java
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ProtocolDelegate.java
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/BBEncoder.java
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/Validator.java
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Assembler.java
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoTransport.java
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaHandler.java
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/nio/NioHandler.java
    incubator/qpid/trunk/qpid/java/tools/bin/qpid-bench
    incubator/qpid/trunk/qpid/java/tools/src/main/java/org/apache/qpid/tools/QpidBench.java

Modified: incubator/qpid/trunk/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/direct/DirectProducer.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/direct/DirectProducer.java?rev=682887&r1=682886&r2=682887&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/direct/DirectProducer.java (original)
+++ incubator/qpid/trunk/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/direct/DirectProducer.java Tue Aug  5 12:33:11 2008
@@ -8,6 +8,7 @@
 import org.apache.qpid.nclient.Session;
 import org.apache.qpid.nclient.util.MessageListener;
 import org.apache.qpid.transport.DeliveryProperties;
+import org.apache.qpid.transport.Header;
 import org.apache.qpid.transport.MessageAcceptMode;
 import org.apache.qpid.transport.MessageAcquireMode;
 
@@ -67,16 +68,14 @@
 
         for (int i=0; i<10; i++)
         {
-            session.messageTransfer("amq.direct", MessageAcceptMode.EXPLICIT,MessageAcquireMode.PRE_ACQUIRED);
-            session.header(deliveryProps);
-            session.data("Message " + i);
-            session.endData();
+            session.messageTransfer("amq.direct", MessageAcceptMode.EXPLICIT,MessageAcquireMode.PRE_ACQUIRED,
+                                    new Header(deliveryProps),
+                                    "Message " + i);
         }
 
-        session.messageTransfer("amq.direct", MessageAcceptMode.EXPLICIT, MessageAcquireMode.PRE_ACQUIRED);
-        session.header(deliveryProps);
-        session.data("That's all, folks!");
-        session.endData();
+        session.messageTransfer("amq.direct", MessageAcceptMode.EXPLICIT, MessageAcquireMode.PRE_ACQUIRED,
+                                new Header(deliveryProps),
+                                "That's all, folks!");
 
         // confirm completion
         session.sync();

Modified: incubator/qpid/trunk/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/fanout/FannoutProducer.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/fanout/FannoutProducer.java?rev=682887&r1=682886&r2=682887&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/fanout/FannoutProducer.java (original)
+++ incubator/qpid/trunk/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/fanout/FannoutProducer.java Tue Aug  5 12:33:11 2008
@@ -4,6 +4,7 @@
 import org.apache.qpid.nclient.Connection;
 import org.apache.qpid.nclient.Session;
 import org.apache.qpid.transport.DeliveryProperties;
+import org.apache.qpid.transport.Header;
 import org.apache.qpid.transport.MessageAcceptMode;
 import org.apache.qpid.transport.MessageAcquireMode;
 
@@ -34,16 +35,13 @@
 
         for (int i=0; i<10; i++)
         {
-            session.messageTransfer("amq.fanout", MessageAcceptMode.EXPLICIT, MessageAcquireMode.PRE_ACQUIRED);
-            session.header(deliveryProps);
-            session.data("Message " + i);
-            session.endData();
+            session.messageTransfer("amq.fanout", MessageAcceptMode.EXPLICIT, MessageAcquireMode.PRE_ACQUIRED,
+                                    new Header(deliveryProps), "Message " + i);
         }
 
-        session.messageTransfer("amq.fanout", MessageAcceptMode.EXPLICIT, MessageAcquireMode.PRE_ACQUIRED);
-        session.header(deliveryProps);
-        session.data("That's all, folks!");
-        session.endData();
+        session.messageTransfer("amq.fanout", MessageAcceptMode.EXPLICIT, MessageAcquireMode.PRE_ACQUIRED,
+                                new Header(deliveryProps),
+                                "That's all, folks!");
 
         // confirm completion
         session.sync();

Modified: incubator/qpid/trunk/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/pubsub/TopicPublisher.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/pubsub/TopicPublisher.java?rev=682887&r1=682886&r2=682887&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/pubsub/TopicPublisher.java (original)
+++ incubator/qpid/trunk/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/pubsub/TopicPublisher.java Tue Aug  5 12:33:11 2008
@@ -4,6 +4,7 @@
 import org.apache.qpid.nclient.Connection;
 import org.apache.qpid.nclient.Session;
 import org.apache.qpid.transport.DeliveryProperties;
+import org.apache.qpid.transport.Header;
 import org.apache.qpid.transport.MessageAcceptMode;
 import org.apache.qpid.transport.MessageAcquireMode;
 
@@ -18,20 +19,17 @@
       deliveryProps.setRoutingKey(routing_key);
 
       for (int i=0; i<5; i++) {
-        session.messageTransfer("amq.topic", MessageAcceptMode.EXPLICIT, MessageAcquireMode.PRE_ACQUIRED);
-        session.header(deliveryProps);
-        session.data("Message " + i);
-        session.endData();
+          session.messageTransfer("amq.topic", MessageAcceptMode.EXPLICIT, MessageAcquireMode.PRE_ACQUIRED,
+                                  new Header(deliveryProps), "Message " + i);
       }
 
     }
 
     public void noMoreMessages(Session session)
     {
-        session.messageTransfer("amq.topic", MessageAcceptMode.EXPLICIT, MessageAcquireMode.PRE_ACQUIRED);
-        session.header(new DeliveryProperties().setRoutingKey("control"));
-        session.data("That's all, folks!");
-        session.endData();
+        session.messageTransfer("amq.topic", MessageAcceptMode.EXPLICIT, MessageAcquireMode.PRE_ACQUIRED,
+                                new Header(new DeliveryProperties().setRoutingKey("control")),
+                                "That's all, folks!");
     }
 
     public static void main(String[] args)

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java?rev=682887&r1=682886&r2=682887&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java Tue Aug  5 12:33:11 2008
@@ -31,6 +31,7 @@
 import org.apache.qpid.client.CustomJMSXProperty;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.util.Strings;
 
 public class JMSTextMessage extends AbstractJMSMessage implements javax.jms.TextMessage
 {
@@ -111,20 +112,17 @@
         try
         {
             if (text != null)
-            {                
-                _data = ByteBuffer.allocate(text.length());
-                _data.limit(text.length()) ;
-                //_data.sweep();
-                _data.setAutoExpand(true);
+            {
                 final String encoding = getContentHeaderProperties().getEncodingAsString();
-                if (encoding == null)
+                if (encoding == null || encoding.equalsIgnoreCase("UTF-8"))
                 {
-                    _data.put(text.getBytes(DEFAULT_CHARSET.name()));
+                    _data = ByteBuffer.wrap(Strings.toUTF8(text));
                 }
                 else
                 {
-                    _data.put(text.getBytes(encoding));
+                    _data = ByteBuffer.wrap(text.getBytes(encoding));
                 }
+                _data.position(_data.limit());
                 _changedData=true;
             }
             _decodedValue = text;

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/Session.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/Session.java?rev=682887&r1=682886&r2=682887&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/Session.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/Session.java Tue Aug  5 12:33:11 2008
@@ -109,42 +109,6 @@
 
 
     /**
-     * <p>This transfer streams a complete message using a single method. 
-     * It uses pull-semantics instead of doing a push.</p>
-     * <p>Data is pulled from a Message object using read()
-     * and pushed using messageTransfer() and headers() followed by data() and endData().
-     * <br><b><i>This method should only be used by large messages</b></i><br>
-     * There are two convenience Message classes to do this.
-     * <ul>
-     * <li> <code>{@link org.apache.qpid.nclient.util.FileMessage}</code>
-     * <li> <code>{@link org.apache.qpid.nclient.util.StreamingMessage}</code>
-     * </ul>
-     * You can also implement a <code>Message</code> interface to wrap any
-     * data stream.
-     * </p>
-     *
-     * @param destination The exchange the message is being sent to.
-     * @param msg         The Message to be sent.
-     * @param confirmMode <ul> </li>off ({@link Session#TRANSFER_CONFIRM_MODE_NOT_REQUIRED}): confirmation
-     *                    is not required. Once a message has been transferred in pre-acquire
-     *                    mode (or once acquire has been sent in no-acquire mode) the message is considered
-     *                    transferred.
-     *                    <p/>
-     *                    <li> on  ({@link Session#TRANSFER_CONFIRM_MODE_REQUIRED}): an acquired message
-     *                    is not considered transferred until the original
-     *                    transfer is complete. A complete transfer is signaled by execution.complete.
-     *                    </ul>
-     * @param acquireMode <ul>
-     *                    <li> no-acquire  ({@link Session#TRANSFER_ACQUIRE_MODE_NO_ACQUIRE}): the message
-     *                    must be explicitly acquired.
-     *                    <li> pre-acquire ({@link Session#TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE}): the message
-     *                    is acquired when the transfer starts.
-     *                    </ul>
-     * @throws java.io.IOException If transferring a message fails due to some internal communication error, an exception is thrown.
-     */
-    public void messageStream(String destination, Message msg, short confirmMode, short acquireMode) throws IOException;
-
-    /**
      * This command transfers a message between two peers.
      *
      * @param destination Specifies the destination to which the message is to be transferred.
@@ -154,46 +118,31 @@
      * @param acquireMode Indicates whether or not the transferred message has been acquired.
      */
     public void messageTransfer(String destination, MessageAcceptMode acceptMode, MessageAcquireMode acquireMode,
-                                Option ... options);
-
-    /**
-     * Make a set of headers to be sent together with a message
-     *
-     * @param headers headers to be added
-     * @see org.apache.qpid.transport.DeliveryProperties
-     * @see org.apache.qpid.transport.MessageProperties
-     * @return The added headers.
-     */
-    public Header header(Struct... headers);
+                                Header header, ByteBuffer body, Option ... options);
 
     /**
-     * Add a byte array to the content of the message being sent.
-     *
-     * @param data Data to be added.
-     */
-    public void data(byte[] data);
-
-    /**
-     * A Add a ByteBuffer to the content of the message being sent.
-     * <p> Note that only the data between the buffer's current position and the
-     * buffer limit is added.
-     * It is therefore recommended to flip the buffer before adding it to the message,
+     * This command transfers a message between two peers.
      *
-     * @param buf Data to be added.
+     * @param destination Specifies the destination to which the message is to be transferred.
+     * @param acceptMode Indicates whether message.accept, session.complete,
+     *                  or nothing at all is required to indicate successful transfer of the message.
+     * 
+     * @param acquireMode Indicates whether or not the transferred message has been acquired.
      */
-    public void data(ByteBuffer buf);
+    public void messageTransfer(String destination, MessageAcceptMode acceptMode, MessageAcquireMode acquireMode,
+                                Header header, byte[] body, Option ... options);
 
     /**
-     * Add a string to the content of the message being sent.
+     * This command transfers a message between two peers.
      *
-     * @param str String to be added.
-     */
-    public void data(String str);
-
-    /**
-     * Signals the end of data for the message.
+     * @param destination Specifies the destination to which the message is to be transferred.
+     * @param acceptMode Indicates whether message.accept, session.complete,
+     *                  or nothing at all is required to indicate successful transfer of the message.
+     * 
+     * @param acquireMode Indicates whether or not the transferred message has been acquired.
      */
-    public void endData();
+    public void messageTransfer(String destination, MessageAcceptMode acceptMode, MessageAcquireMode acquireMode,
+                                Header header, String body, Option ... options);
 
     //------------------------------------------------------
     //                 Messaging methods

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/impl/ClientSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/impl/ClientSession.java?rev=682887&r1=682886&r2=682887&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/impl/ClientSession.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/impl/ClientSession.java Tue Aug  5 12:33:11 2008
@@ -11,8 +11,11 @@
 import org.apache.qpid.api.Message;
 import org.apache.qpid.nclient.ClosedListener;
 import org.apache.qpid.nclient.MessagePartListener;
+import org.apache.qpid.transport.DeliveryProperties;
+import org.apache.qpid.transport.Header;
 import org.apache.qpid.transport.MessageAcceptMode;
 import org.apache.qpid.transport.MessageAcquireMode;
+import org.apache.qpid.transport.MessageProperties;
 import org.apache.qpid.transport.Option;
 import org.apache.qpid.transport.Range;
 import org.apache.qpid.transport.RangeSet;
@@ -85,24 +88,29 @@
 
     public void messageTransfer(String destination, Message msg, short acceptMode, short acquireMode) throws IOException
     {
-        // The javadoc clearly says that this method is suitable for small messages
-        // therefore reading the content in one shot.
-        ByteBuffer  data = msg.readData();
-        super.messageTransfer(destination, MessageAcceptMode.get(acceptMode),
-                              MessageAcquireMode.get(acquireMode));
-       // super.header(msg.getDeliveryProperties(),msg.getMessageProperties()  );
-        if( msg.getHeader() == null || msg.getDeliveryProperties().isDirty() || msg.getMessageProperties().isDirty() )
-        {
-            msg.setHeader( super.header(msg.getDeliveryProperties(),msg.getMessageProperties()) );
-            msg.getDeliveryProperties().setDirty(false);
-            msg.getMessageProperties().setDirty(false);
+        DeliveryProperties dp = msg.getDeliveryProperties();
+        MessageProperties mp = msg.getMessageProperties();
+        Header header;
+        if (msg.getHeader() == null || dp.isDirty() || mp.isDirty())
+        {
+            header = new Header(dp, mp);
+            msg.setHeader(header);
+            dp.setDirty(false);
+            mp.setDirty(false);
         }
         else
         {
-            super.header(msg.getHeader());
+            header = msg.getHeader();
         }
-        data( data );
-        endData();
+        // The javadoc clearly says that this method is suitable for small messages
+        // therefore reading the content in one shot.
+        ByteBuffer  body = msg.readData();
+        int size = body.remaining();
+        super.messageTransfer
+            (destination, MessageAcceptMode.get(acceptMode),
+             MessageAcquireMode.get(acquireMode), header, body);
+        _currentDataSizeNotSynced += size;
+        _currentDataSizeNotFlushed += size;
     }
 
     public void sync()
@@ -111,65 +119,6 @@
         _currentDataSizeNotSynced = 0;
     }
 
-    /* -------------------------
-     * Data methods
-     * ------------------------*/
-
-    public void data(ByteBuffer buf)
-    {
-        _currentDataSizeNotSynced = _currentDataSizeNotSynced + buf.remaining();
-        _currentDataSizeNotFlushed = _currentDataSizeNotFlushed + buf.remaining();
-        super.data(buf);
-    }
-
-    public void data(String str)
-    {
-        _currentDataSizeNotSynced = _currentDataSizeNotSynced + str.getBytes().length;
-        super.data(str);
-    }
-
-    public void data(byte[] bytes)
-    {
-        _currentDataSizeNotSynced = _currentDataSizeNotSynced + bytes.length;
-        super.data(bytes);
-    }
-
-    public void messageStream(String destination, Message msg, short acceptMode, short acquireMode) throws IOException
-    {
-        super.messageTransfer(destination, MessageAcceptMode.get(acceptMode),
-                              MessageAcquireMode.get(acquireMode));
-        super.header(msg.getDeliveryProperties(),msg.getMessageProperties());
-        boolean b = true;
-        int count = 0;
-        while(b)
-        {
-            try
-            {
-                System.out.println("count : " + count++);
-                data(msg.readData());
-            }
-            catch(EOFException e)
-            {
-                b = false;
-            }
-        }
-        endData();
-    }
-
-    public void endData()
-    {
-        super.endData();
-    /*    if( MAX_NOT_SYNC_DATA_LENGH != -1 && _currentDataSizeNotSynced >= MAX_NOT_SYNC_DATA_LENGH)
-        {
-            sync();
-        }
-         if( MAX_NOT_FLUSH_DATA_LENGH != -1 && _currentDataSizeNotFlushed >= MAX_NOT_FLUSH_DATA_LENGH)
-        {
-           executionFlush();
-            _currentDataSizeNotFlushed = 0;
-        }*/
-    }
-
     public RangeSet getRejectedMessages()
     {
         return _rejectedMessages;

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/impl/ClientSessionDelegate.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/impl/ClientSessionDelegate.java?rev=682887&r1=682886&r2=682887&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/impl/ClientSessionDelegate.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/impl/ClientSessionDelegate.java Tue Aug  5 12:33:11 2008
@@ -7,7 +7,6 @@
 import org.apache.qpid.nclient.MessagePartListener;
 
 import org.apache.qpid.QpidException;
-import org.apache.qpid.transport.Data;
 import org.apache.qpid.transport.Header;
 import org.apache.qpid.transport.MessageReject;
 import org.apache.qpid.transport.MessageTransfer;
@@ -18,46 +17,27 @@
 
 
 public class ClientSessionDelegate extends SessionDelegate
-{    
-    private MessageTransfer _currentTransfer;
-    private MessagePartListener _currentMessageListener;
-    
-    @Override public void sessionDetached(Session ssn, SessionDetached dtc)
-    {
-        ((ClientSession)ssn).notifyException(new QpidException("", ErrorCode.get(dtc.getCode().getValue()),null));
-    }
-    
+{
+
     //  --------------------------------------------
     //   Message methods
     // --------------------------------------------
-    @Override public void data(Session ssn, Data data)
+    @Override public void messageTransfer(Session session, MessageTransfer xfr)
     {
-        _currentMessageListener.data(data.getData());
-        if (data.isLast())
+        MessagePartListener listener = ((ClientSession)session).getMessageListeners()
+            .get(xfr.getDestination());
+        listener.messageTransfer(xfr.getId());
+        listener.messageHeader(xfr.getHeader());
+        ByteBuffer body = xfr.getBody();
+        if (body == null)
         {
-            _currentMessageListener.messageReceived();
+            body = ByteBuffer.allocate(0);
         }
+        listener.data(body);
+        listener.messageReceived();
     }
 
-    @Override public void header(Session ssn, Header header)
-    {
-        _currentMessageListener.messageHeader(header);
-        if( header.hasNoPayload())
-        {
-           _currentMessageListener.data(ByteBuffer.allocate(0));
-           _currentMessageListener.messageReceived();
-        }
-    }
-
-
-    @Override public void messageTransfer(Session session, MessageTransfer currentTransfer)
-    {
-        _currentTransfer = currentTransfer;
-        _currentMessageListener = ((ClientSession)session).getMessageListeners().get(currentTransfer.getDestination());
-        _currentMessageListener.messageTransfer(currentTransfer.getId());
-    }
-    
-    @Override public void messageReject(Session session, MessageReject struct) 
+    @Override public void messageReject(Session session, MessageReject struct)
     {
         for (Range range : struct.getTransfers())
         {

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/impl/DemoClient.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/impl/DemoClient.java?rev=682887&r1=682886&r2=682887&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/impl/DemoClient.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/impl/DemoClient.java Tue Aug  5 12:33:11 2008
@@ -9,10 +9,12 @@
 import org.apache.qpid.nclient.util.MessageListener;
 import org.apache.qpid.nclient.util.MessagePartListenerAdapter;
 import org.apache.qpid.transport.DeliveryProperties;
+import org.apache.qpid.transport.Header;
 import org.apache.qpid.transport.MessageAcceptMode;
 import org.apache.qpid.transport.MessageAcquireMode;
 import org.apache.qpid.transport.MessageProperties;
 
+import java.nio.ByteBuffer;
 import java.util.UUID;
 
 public class DemoClient
@@ -56,17 +58,15 @@
         ssn.messageSubscribe("queue1", "myDest", (short)0, (short)0,createAdapter(), null);
 
         // queue
-        ssn.messageTransfer("amq.direct", MessageAcceptMode.NONE, MessageAcquireMode.PRE_ACQUIRED);
-        ssn.header(new DeliveryProperties().setRoutingKey("queue1"),
-                   new MessageProperties().setMessageId(UUID.randomUUID()));
-        ssn.data("this is the data");
-        ssn.endData();
+        ssn.messageTransfer("amq.direct", MessageAcceptMode.NONE, MessageAcquireMode.PRE_ACQUIRED,
+                            new Header(new DeliveryProperties().setRoutingKey("queue1"),
+                                       new MessageProperties().setMessageId(UUID.randomUUID())),
+                            ByteBuffer.wrap("this is the data".getBytes()));
 
         //reject
-        ssn.messageTransfer("amq.direct", MessageAcceptMode.NONE, MessageAcquireMode.PRE_ACQUIRED);
-        ssn.data("this should be rejected");
-        ssn.header(new DeliveryProperties().setRoutingKey("stocks"));
-        ssn.endData();
+        ssn.messageTransfer("amq.direct", MessageAcceptMode.NONE, MessageAcquireMode.PRE_ACQUIRED,
+                            new Header(new DeliveryProperties().setRoutingKey("stocks")),
+                            ByteBuffer.wrap("this should be rejected".getBytes()));
         ssn.sync();
 
         // topic subs
@@ -84,11 +84,10 @@
         ssn.sync();
 
         // topic
-        ssn.messageTransfer("amq.topic", MessageAcceptMode.NONE, MessageAcquireMode.PRE_ACQUIRED);
-        ssn.data("Topic message");
-        ssn.header(new DeliveryProperties().setRoutingKey("stock.us.ibm"),
-                   new MessageProperties().setMessageId(UUID.randomUUID()));
-        ssn.endData();
+        ssn.messageTransfer("amq.topic", MessageAcceptMode.NONE, MessageAcquireMode.PRE_ACQUIRED,
+                            new Header(new DeliveryProperties().setRoutingKey("stock.us.ibm"),
+                                       new MessageProperties().setMessageId(UUID.randomUUID())),
+                            ByteBuffer.wrap("Topic message".getBytes()));
     }
 
 }

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/interop/BasicInteropTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/interop/BasicInteropTest.java?rev=682887&r1=682886&r2=682887&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/interop/BasicInteropTest.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/interop/BasicInteropTest.java Tue Aug  5 12:33:11 2008
@@ -1,5 +1,6 @@
 package org.apache.qpid.nclient.interop;
 
+import java.nio.ByteBuffer;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -13,6 +14,7 @@
 import org.apache.qpid.nclient.util.MessageListener;
 import org.apache.qpid.nclient.util.MessagePartListenerAdapter;
 import org.apache.qpid.transport.DeliveryProperties;
+import org.apache.qpid.transport.Header;
 import org.apache.qpid.transport.MessageAcceptMode;
 import org.apache.qpid.transport.MessageAcquireMode;
 import org.apache.qpid.transport.MessageCreditUnit;
@@ -77,18 +79,15 @@
 
     public void testSendMessage(){
         System.out.println("------- Sending a message --------");
-        session.messageTransfer("test", MessageAcceptMode.NONE, MessageAcquireMode.PRE_ACQUIRED);
-
         Map<String,Object> props = new HashMap<String,Object>();
         props.put("name", "rajith");
         props.put("age", 10);
         props.put("spf", 8.5);
-        session.header(new DeliveryProperties().setRoutingKey("testKey"),new MessageProperties().setApplicationHeaders(props));
-
-        //session.header(new DeliveryProperties().setRoutingKey("testKey"));
+        session.messageTransfer("test", MessageAcceptMode.NONE, MessageAcquireMode.PRE_ACQUIRED,
+                                new Header(new DeliveryProperties().setRoutingKey("testKey"),
+                                           new MessageProperties().setApplicationHeaders(props)),
+                                ByteBuffer.wrap("TestMessage".getBytes()));
 
-        session.data("TestMessage");
-        session.endData();
         session.sync();
         System.out.println("------- Message sent --------");
     }

Modified: incubator/qpid/trunk/qpid/java/common/Composite.tpl
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/Composite.tpl?rev=682887&r1=682886&r2=682887&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/Composite.tpl (original)
+++ incubator/qpid/trunk/qpid/java/common/Composite.tpl Tue Aug  5 12:33:11 2008
@@ -1,5 +1,6 @@
 package org.apache.qpid.transport;
 
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.LinkedHashMap;
 import java.util.List;
@@ -9,7 +10,6 @@
 import org.apache.qpid.transport.codec.Decoder;
 import org.apache.qpid.transport.codec.Encodable;
 import org.apache.qpid.transport.codec.Encoder;
-import org.apache.qpid.transport.codec.Validator;
 
 import org.apache.qpid.transport.network.Frame;
 
@@ -18,11 +18,13 @@
 
 cls = klass(type)["@name"]
 
+segments = type["segments"]
+
 if type.name in ("control", "command"):
   base = "Method"
   size = 0
   pack = 2
-  if type["segments"]:
+  if segments:
     payload = "true"
   else:
     payload = "false"
@@ -86,6 +88,10 @@
 for f in fields:
   if not f.empty:
     out("    private $(f.type) $(f.name);\n")
+
+if segments:
+  out("    private Header header;\n")
+  out("    private ByteBuffer body;\n")
 }
 
 ${
@@ -99,6 +105,10 @@
   if f.option: continue
   out("        $(f.set)($(f.name));\n")
 
+if segments:
+  out("        setHeader(header);\n")
+  out("        setBody(body);\n")
+
 if options or base == "Method":
   out("""
         for (int i=0; i < _options.length; i++) {
@@ -154,7 +164,6 @@
     }
 
     public final $name $(f.set)($(f.type) value) {
-        $(f.check)
 ${
 if not f.empty:
   out("        this.$(f.name) = value;")
@@ -173,6 +182,44 @@
 """)
 }
 
+${
+if segments:
+    out("""    public final Header getHeader() {
+        return this.header;
+    }
+
+    public final void setHeader(Header header) {
+        this.header = header;
+    }
+
+    public final $name header(Header header) {
+        setHeader(header);
+        return this;
+    }
+
+    public final ByteBuffer getBody() {
+        if (this.body == null)
+        {
+            return null;
+        }
+        else
+        {
+            return this.body.slice();
+        }
+    }
+
+    public final void setBody(ByteBuffer body) {
+        this.body = body;
+    }
+
+    public final $name body(ByteBuffer body)
+    {
+        setBody(body);
+        return this;
+    }
+""")
+}
+
     public void write(Encoder enc)
     {
 ${

Modified: incubator/qpid/trunk/qpid/java/common/Invoker.tpl
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/Invoker.tpl?rev=682887&r1=682886&r2=682887&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/Invoker.tpl (original)
+++ incubator/qpid/trunk/qpid/java/common/Invoker.tpl Tue Aug  5 12:33:11 2008
@@ -1,5 +1,6 @@
 package org.apache.qpid.transport;
 
+import java.nio.ByteBuffer;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
@@ -32,9 +33,9 @@
     jclass = ""
 
   out("""
-     public final $jresult $(dromedary(name))($(", ".join(params))) {
-         $(jreturn)invoke(new $name($(", ".join(args)))$jclass);
-     }
+    public final $jresult $(dromedary(name))($(", ".join(params))) {
+        $(jreturn)invoke(new $name($(", ".join(args)))$jclass);
+    }
 """)
 }
 

Modified: incubator/qpid/trunk/qpid/java/common/genutil.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/genutil.py?rev=682887&r1=682886&r2=682887&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/genutil.py (original)
+++ incubator/qpid/trunk/qpid/java/common/genutil.py Tue Aug  5 12:33:11 2008
@@ -170,18 +170,15 @@
     if self.type_node.name == "struct":
       self.read = "(%s) dec.readStruct(%s.TYPE)" % (tname, tname)
       self.write = "enc.writeStruct(%s.TYPE, check(struct).%s)" % (tname, self.name)
-      self.check = ""
       self.coder = "Struct"
     elif self.type_node.name == "domain":
       self.coder = camel(0, self.prim_type["@name"])
       self.read = "%s.get(dec.read%s())" % (tname, self.coder)
       self.write = "enc.write%s(check(struct).%s.getValue())" % (self.coder, self.name)
-      self.check = ""
     else:
       self.coder = camel(0, self.type_node["@name"])
       self.read = "dec.read%s()" % self.coder
       self.write = "enc.write%s(check(struct).%s)" % (self.coder, self.name)
-      self.check = "Validator.check%s(value);" % self.coder
     self.type = jtype(self.type_node)
     self.default = DEFAULTS.get(self.type, "null")
     self.has = camel(1, "has", self.name)
@@ -214,6 +211,9 @@
       options = True
     else:
       params.append("%s %s" % (f.type, f.name))
+  if type["segments"]:
+    params.append("Header header")
+    params.append("ByteBuffer body")
   if options or type.name in ("control", "command"):
     params.append("Option ... _options")
   return params
@@ -226,6 +226,9 @@
       options = True
     else:
       args.append(f.name)
+  if type["segments"]:
+    args.append("header")
+    args.append("body")
   if options or type.name in ("control", "command"):
     args.append("_options")
   return args

Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/ToyBroker.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/ToyBroker.java?rev=682887&r1=682886&r2=682887&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/ToyBroker.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/ToyBroker.java Tue Aug  5 12:33:11 2008
@@ -45,10 +45,6 @@
 {
 
     private ToyExchange exchange;
-    private MessageTransfer xfr = null;
-    private DeliveryProperties props = null;
-    private Header header = null;
-    private List<Data> body = null;
     private Map<String,Consumer> consumers = new ConcurrentHashMap<String,Consumer>();
 
     public ToyBroker(ToyExchange exchange)
@@ -103,22 +99,10 @@
 
     @Override public void messageTransfer(Session ssn, MessageTransfer xfr)
     {
-        this.xfr = xfr;
-        body = new ArrayList<Data>();
-        System.out.println("received transfer " + xfr.getDestination());
-    }
-
-    @Override public void header(Session ssn, Header header)
-    {
-        if (xfr == null || body == null)
-        {
-            ssn.connectionClose(ConnectionCloseCode.FRAMING_ERROR,
-                                "no method segment");
-            ssn.close();
-            return;
-        }
-
-        props = header.get(DeliveryProperties.class);
+        String dest = xfr.getDestination();
+        System.out.println("received transfer " + dest);
+        Header header = xfr.getHeader();
+        DeliveryProperties props = header.get(DeliveryProperties.class);
         if (props != null)
         {
             System.out.println("received headers routing_key " + props.getRoutingKey());
@@ -130,67 +114,31 @@
             System.out.println(mp.getApplicationHeaders());
         }
 
-        this.header = header;
-    }
-
-    @Override public void data(Session ssn, Data data)
-    {
-        if (xfr == null || body == null)
+        if (exchange.route(dest,props.getRoutingKey(),xfr))
         {
-            ssn.connectionClose(ConnectionCloseCode.FRAMING_ERROR, "no method segment");
-            ssn.close();
-            return;
+            System.out.println("queued " + xfr);
+            dispatchMessages(ssn);
         }
-
-        body.add(data);
-
-        if (data.isLast())
+        else
         {
-            String dest = xfr.getDestination();
-            Message m = new Message(header, body);
 
-            if (exchange.route(dest,props.getRoutingKey(),m))
+            if (props == null || !props.getDiscardUnroutable())
             {
-                System.out.println("queued " + m);
-                dispatchMessages(ssn);
+                RangeSet ranges = new RangeSet();
+                ranges.add(xfr.getId());
+                ssn.messageReject(ranges, MessageRejectCode.UNROUTABLE,
+                                  "no such destination");
             }
-            else
-            {
-
-                reject(ssn);
-            }
-            ssn.processed(xfr);
-            xfr = null;
-            body = null;
-        }
-    }
-
-    private void reject(Session ssn)
-    {
-        if (props != null && props.getDiscardUnroutable())
-        {
-            return;
-        }
-        else
-        {
-            RangeSet ranges = new RangeSet();
-            ranges.add(xfr.getId());
-            ssn.messageReject(ranges, MessageRejectCode.UNROUTABLE,
-                              "no such destination");
         }
+        ssn.processed(xfr);
     }
 
-    private void transferMessageToPeer(Session ssn,String dest, Message m)
+    private void transferMessageToPeer(Session ssn,String dest, MessageTransfer m)
     {
         System.out.println("\n==================> Transfering message to: " +dest + "\n");
-        ssn.messageTransfer(dest, MessageAcceptMode.EXPLICIT,
-                            MessageAcquireMode.PRE_ACQUIRED);
-        ssn.header(m.header);
-        for (Data d : m.body)
-        {
-            ssn.data(d.getData());
-        }
-        ssn.endData();
+        ssn.messageTransfer(m.getDestination(), MessageAcceptMode.EXPLICIT,
+                            MessageAcquireMode.PRE_ACQUIRED,
+                            m.getHeader(), m.getBody());
     }
 
     private void dispatchMessages(Session ssn)
@@ -204,8 +152,8 @@
     private void checkAndSendMessagesToConsumer(Session ssn,String dest)
     {
         Consumer c = consumers.get(dest);
-        LinkedBlockingQueue<Message> queue = exchange.getQueue(c._queueName);
-        Message m = queue.poll();
+        LinkedBlockingQueue<MessageTransfer> queue = exchange.getQueue(c._queueName);
+        MessageTransfer m = queue.poll();
         while (m != null && c._credit>0)
         {
             transferMessageToPeer(ssn,dest,m);
@@ -214,43 +162,6 @@
         }
     }
 
-    class Message
-    {
-        private final Header header;
-        private final List<Data> body;
-
-        public Message(Header header, List<Data> body)
-        {
-            this.header = header;
-            this.body = body;
-        }
-
-        public String toString()
-        {
-            StringBuilder sb = new StringBuilder();
-
-            if (header != null)
-            {
-                boolean first = true;
-                for (Struct st : header.getStructs())
-                {
-                    if (first) { first = false; }
-                    else { sb.append(" "); }
-                    sb.append(st);
-                }
-            }
-
-            for (Data d : body)
-            {
-                sb.append(" | ");
-                sb.append(d);
-            }
-
-            return sb.toString();
-        }
-
-    }
-
     // ugly, but who cares :)
     // assumes unit is always no of messages, not bytes
     // assumes it's credit mode and not window

Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/ToyClient.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/ToyClient.java?rev=682887&r1=682886&r2=682887&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/ToyClient.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/ToyClient.java Tue Aug  5 12:33:11 2008
@@ -20,6 +20,7 @@
  */
 package org.apache.qpid;
 
+import java.nio.*;
 import java.util.*;
 
 import org.apache.qpid.transport.*;
@@ -47,17 +48,9 @@
         }
     }
 
-    @Override public void header(Session ssn, Header header)
+    @Override public void messageTransfer(Session ssn, MessageTransfer xfr)
     {
-        for (Struct st : header.getStructs())
-        {
-            System.out.println("header: " + st);
-        }
-    }
-
-    @Override public void data(Session ssn, Data data)
-    {
-        System.out.println("got data: " + data);
+        System.out.println("msg: " + xfr);
     }
 
     public static final void main(String[] args)
@@ -111,16 +104,16 @@
         map.put("binary", new byte[] {1, 2, 3, 4, 5, 6, 7, 8, 9, 10});
 
         ssn.messageTransfer("asdf", MessageAcceptMode.EXPLICIT,
-                            MessageAcquireMode.PRE_ACQUIRED);
-        ssn.header(new DeliveryProperties(),
-                   new MessageProperties().setApplicationHeaders(map));
-        ssn.data("this is the data");
-        ssn.endData();
+                            MessageAcquireMode.PRE_ACQUIRED,
+                            new Header(new DeliveryProperties(),
+                                       new MessageProperties()
+                                       .setApplicationHeaders(map)),
+                            "this is the data");
 
         ssn.messageTransfer("fdsa", MessageAcceptMode.EXPLICIT,
-                            MessageAcquireMode.PRE_ACQUIRED);
-        ssn.data("this should be rejected");
-        ssn.endData();
+                            MessageAcquireMode.PRE_ACQUIRED,
+                            null,
+                            "this should be rejected");
         ssn.sync();
 
         Future<QueueQueryResult> future = ssn.queueQuery("asdf");

Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/ToyExchange.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/ToyExchange.java?rev=682887&r1=682886&r2=682887&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/ToyExchange.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/ToyExchange.java Tue Aug  5 12:33:11 2008
@@ -9,42 +9,43 @@
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
-import org.apache.qpid.ToyBroker.Message;
+import org.apache.qpid.transport.MessageTransfer;
+
 
 public class ToyExchange
 {
   final static String DIRECT = "amq.direct";
   final static String TOPIC = "amq.topic";
   
-  private Map<String,List<LinkedBlockingQueue<Message>>> directEx = new HashMap<String,List<LinkedBlockingQueue<Message>>>();
-  private Map<String,List<LinkedBlockingQueue<Message>>> topicEx = new HashMap<String,List<LinkedBlockingQueue<Message>>>(); 
-  private Map<String,LinkedBlockingQueue<Message>> queues = new HashMap<String,LinkedBlockingQueue<Message>>(); 
+  private Map<String,List<LinkedBlockingQueue<MessageTransfer>>> directEx = new HashMap<String,List<LinkedBlockingQueue<MessageTransfer>>>();
+  private Map<String,List<LinkedBlockingQueue<MessageTransfer>>> topicEx = new HashMap<String,List<LinkedBlockingQueue<MessageTransfer>>>(); 
+  private Map<String,LinkedBlockingQueue<MessageTransfer>> queues = new HashMap<String,LinkedBlockingQueue<MessageTransfer>>(); 
   
   public void createQueue(String name)
   {
-      queues.put(name, new LinkedBlockingQueue<Message>());
+      queues.put(name, new LinkedBlockingQueue<MessageTransfer>());
   }
   
-  public LinkedBlockingQueue<Message> getQueue(String name)
+  public LinkedBlockingQueue<MessageTransfer> getQueue(String name)
   {
       return queues.get(name);
   }
   
   public void bindQueue(String type,String binding,String queueName)
   {
-      LinkedBlockingQueue<Message> queue = queues.get(queueName);
+      LinkedBlockingQueue<MessageTransfer> queue = queues.get(queueName);
       binding = normalizeKey(binding);
       if(DIRECT.equals(type))
       {
           
           if (directEx.containsKey(binding))
           {
-              List<LinkedBlockingQueue<Message>> list = directEx.get(binding);
+              List<LinkedBlockingQueue<MessageTransfer>> list = directEx.get(binding);
               list.add(queue);
           }
           else
           {
-              List<LinkedBlockingQueue<Message>> list = new LinkedList<LinkedBlockingQueue<Message>>();
+              List<LinkedBlockingQueue<MessageTransfer>> list = new LinkedList<LinkedBlockingQueue<MessageTransfer>>();
               list.add(queue);
               directEx.put(binding,list);
           }
@@ -53,21 +54,21 @@
       {
           if (topicEx.containsKey(binding))
           {
-              List<LinkedBlockingQueue<Message>> list = topicEx.get(binding);
+              List<LinkedBlockingQueue<MessageTransfer>> list = topicEx.get(binding);
               list.add(queue);
           }
           else
           {
-              List<LinkedBlockingQueue<Message>> list = new LinkedList<LinkedBlockingQueue<Message>>();
+              List<LinkedBlockingQueue<MessageTransfer>> list = new LinkedList<LinkedBlockingQueue<MessageTransfer>>();
               list.add(queue);
               topicEx.put(binding,list);
           }
       }
   }
   
-  public boolean route(String dest,String routingKey,Message msg)
+  public boolean route(String dest, String routingKey, MessageTransfer msg)
   {
-      List<LinkedBlockingQueue<Message>> queues;
+      List<LinkedBlockingQueue<MessageTransfer>> queues;
       if(DIRECT.equals(dest))
       {
           queues = directEx.get(routingKey);          
@@ -101,9 +102,9 @@
       }
   }
   
-  private List<LinkedBlockingQueue<Message>> matchWildCard(String routingKey)
+  private List<LinkedBlockingQueue<MessageTransfer>> matchWildCard(String routingKey)
   {        
-      List<LinkedBlockingQueue<Message>> selected = new ArrayList<LinkedBlockingQueue<Message>>();
+      List<LinkedBlockingQueue<MessageTransfer>> selected = new ArrayList<LinkedBlockingQueue<MessageTransfer>>();
       
       for(String key: topicEx.keySet())
       {
@@ -111,7 +112,7 @@
           Matcher m = p.matcher(routingKey);
           if (m.find())
           {
-              for(LinkedBlockingQueue<Message> queue : topicEx.get(key))
+              for(LinkedBlockingQueue<MessageTransfer> queue : topicEx.get(key))
               {
                     selected.add(queue);
               }
@@ -121,9 +122,9 @@
       return selected;      
   }
 
-  private void storeMessage(Message msg,List<LinkedBlockingQueue<Message>> selected)
+  private void storeMessage(MessageTransfer msg,List<LinkedBlockingQueue<MessageTransfer>> selected)
   {
-      for(LinkedBlockingQueue<Message> queue : selected)
+      for(LinkedBlockingQueue<MessageTransfer> queue : selected)
       {
           queue.offer(msg);
       }

Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java?rev=682887&r1=682886&r2=682887&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java Tue Aug  5 12:33:11 2008
@@ -418,9 +418,15 @@
         return chars;
     }
 
+    private String str = null;
+
     public String asString()
     {
-        return new String(asChars());
+        if (str == null)
+        {
+            str = new String(asChars());
+        }
+        return str;
     }
 
     public boolean equals(Object o)

Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Channel.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Channel.java?rev=682887&r1=682886&r2=682887&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Channel.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Channel.java Tue Aug  5 12:33:11 2008
@@ -53,11 +53,6 @@
     // session may be null
     private Session session;
 
-    private Lock commandLock = new ReentrantLock();
-    private boolean first = true;
-    private ByteBuffer data = null;
-    private boolean batch = false;
-
     public Channel(Connection connection, int channel, SessionDelegate delegate)
     {
         this.connection = connection;
@@ -105,16 +100,6 @@
         method.delegate(session, sessionDelegate);
     }
 
-    public void header(Void v, Header header)
-    {
-        header.delegate(session, sessionDelegate);
-    }
-
-    public void data(Void v, Data data)
-    {
-        data.delegate(session, sessionDelegate);
-    }
-
     public void error(Void v, ProtocolError error)
     {
         throw new RuntimeException(error.getMessage());
@@ -157,62 +142,12 @@
 
     public void method(Method m)
     {
-        if (m.getEncodedTrack() == Frame.L4)
-        {
-            commandLock.lock();
-        }
-
         emit(m);
 
-        if (!m.isBatch() && !m.hasPayload())
-        {
-            connection.flush();
-        }
-
-        batch = m.isBatch();
-
-        if (m.getEncodedTrack() == Frame.L4 && !m.hasPayload())
-        {
-            commandLock.unlock();
-        }
-    }
-
-    public void header(Header header)
-    {
-        emit(header);
-    }
-
-    public void data(ByteBuffer buf)
-    {
-        if (data != null)
-        {
-            emit(new Data(data, first, false));
-            first = false;
-        }
-
-        data = buf;
-    }
-
-    public void data(String str)
-    {
-        data(str.getBytes());
-    }
-
-    public void data(byte[] bytes)
-    {
-        data(ByteBuffer.wrap(bytes));
-    }
-
-    public void end()
-    {
-        emit(new Data(data, first, true));
-        first = true;
-        data = null;
-        if (!batch)
+        if (!m.isBatch())
         {
             connection.flush();
         }
-        commandLock.unlock();
     }
 
     protected void invoke(Method m)

Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Echo.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Echo.java?rev=682887&r1=682886&r2=682887&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Echo.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Echo.java Tue Aug  5 12:33:11 2008
@@ -40,22 +40,6 @@
     {
         this.xfr = xfr;
         ssn.invoke(xfr);
-    }
-
-    public void header(Session ssn, Header hdr)
-    {
-        ssn.header(hdr);
-    }
-
-    public void data(Session ssn, Data data)
-    {
-        ssn.data(data.getData());
-        if (data.isLast())
-        {
-            ssn.endData();
-        }
-
-        // XXX: should be able to get command-id from any segment
         ssn.processed(xfr);
     }
 

Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Header.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Header.java?rev=682887&r1=682886&r2=682887&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Header.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Header.java Tue Aug  5 12:33:11 2008
@@ -22,6 +22,7 @@
 
 import org.apache.qpid.transport.network.Frame;
 
+import java.util.Arrays;
 import java.util.List;
 import java.nio.ByteBuffer;
 
@@ -32,33 +33,25 @@
  * @author Rafael H. Schloming
  */
 
-public class Header implements ProtocolEvent {
+public class Header {
 
     private final List<Struct> structs;
-    private ByteBuffer _buf;
-    private boolean _noPayload;
-    private int channel;
 
-    public Header(List<Struct> structs, boolean lastframe)
+    public Header(List<Struct> structs)
     {
         this.structs = structs;
-        _noPayload= lastframe;
     }
 
-    public List<Struct> getStructs()
+    public Header(Struct ... structs)
     {
-        return structs;
+        this(Arrays.asList(structs));
     }
 
-    public void setBuf(ByteBuffer buf)
+    public List<Struct> getStructs()
     {
-        _buf = buf;
+        return structs;
     }
 
-    public ByteBuffer getBuf()
-    {
-        return _buf;
-    }
     public <T> T get(Class<T> klass)
     {
         for (Struct st : structs)
@@ -72,36 +65,9 @@
         return null;
     }
 
-    public final int getChannel()
-    {
-        return channel;
-    }
-
-    public final void setChannel(int channel)
-    {
-        this.channel = channel;
-    }
-
-    public byte getEncodedTrack()
-    {
-        return Frame.L4;
-    }
-
-    public <C> void delegate(C context, ProtocolDelegate<C> delegate)
-    {
-        delegate.header(context, this);
-    }
-
-    public boolean hasNoPayload()
-    {
-        return _noPayload;
-    }
-
     public String toString()
     {
         StringBuffer str = new StringBuffer();
-        str.append("ch=");
-        str.append(channel);
         str.append(" Header(");
         boolean first = true;
         for (Struct s : structs)

Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Method.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Method.java?rev=682887&r1=682886&r2=682887&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Method.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Method.java Tue Aug  5 12:33:11 2008
@@ -22,6 +22,10 @@
 
 import org.apache.qpid.transport.network.Frame;
 
+import java.nio.ByteBuffer;
+
+import static org.apache.qpid.transport.util.Functions.*;
+
 /**
  * Method
  *
@@ -88,6 +92,26 @@
 
     public abstract boolean hasPayload();
 
+    public Header getHeader()
+    {
+        return null;
+    }
+
+    public void setHeader(Header header)
+    {
+        throw new UnsupportedOperationException();
+    }
+
+    public ByteBuffer getBody()
+    {
+        return null;
+    }
+
+    public void setBody(ByteBuffer body)
+    {
+        throw new UnsupportedOperationException();
+    }
+
     public abstract byte getEncodedTrack();
 
     public abstract <C> void dispatch(C context, MethodDelegate<C> delegate);
@@ -134,6 +158,21 @@
 
         str.append(" ");
         str.append(super.toString());
+        Header hdr = getHeader();
+        if (hdr != null)
+        {
+            for (Struct st : hdr.getStructs())
+            {
+                str.append("\n  ");
+                str.append(st);
+            }
+        }
+        ByteBuffer body = getBody();
+        if (body != null)
+        {
+            str.append("\n  body=");
+            str.append(str(body, 64));
+        }
 
         return str.toString();
     }

Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ProtocolDelegate.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ProtocolDelegate.java?rev=682887&r1=682886&r2=682887&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ProtocolDelegate.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ProtocolDelegate.java Tue Aug  5 12:33:11 2008
@@ -35,10 +35,6 @@
 
     void command(C context, Method command);
 
-    void header(C context, Header header);
-
-    void data(C context, Data data);
-
     void error(C context, ProtocolError error);
 
 }

Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java?rev=682887&r1=682886&r2=682887&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java Tue Aug  5 12:33:11 2008
@@ -36,6 +36,7 @@
 import static org.apache.qpid.transport.Option.*;
 import static org.apache.qpid.transport.util.Functions.*;
 import static org.apache.qpid.util.Serial.*;
+import static org.apache.qpid.util.Strings.*;
 
 /**
  * Session
@@ -271,7 +272,7 @@
                 }
                 needSync = !m.isSync();
                 channel.method(m);
-                if (autoSync && !m.hasPayload())
+                if (autoSync)
                 {
                     sync();
                 }
@@ -290,50 +291,6 @@
         }
     }
 
-    public void header(Header header)
-    {
-        channel.header(header);
-    }
-
-    public Header header(List<Struct> structs)
-    {
-        Header res = new Header(structs, false);
-        header(res);
-        return res;
-    }
-
-    public Header header(Struct ... structs)
-    {
-        return header(Arrays.asList(structs));
-    }
-
-    public void data(ByteBuffer buf)
-    {
-        channel.data(buf);
-    }
-
-    public void data(String str)
-    {
-        channel.data(str);
-    }
-
-    public void data(byte[] bytes)
-    {
-        channel.data(bytes);
-    }
-
-    public void endData()
-    {
-        channel.end();
-        synchronized (commands)
-        {
-            if (autoSync)
-            {
-                sync();
-            }
-        }
-    }
-
     public void sync()
     {
         sync(timeout);
@@ -501,6 +458,26 @@
 
     }
 
+    public final void messageTransfer(String destination,
+                                      MessageAcceptMode acceptMode,
+                                      MessageAcquireMode acquireMode,
+                                      Header header,
+                                      byte[] body,
+                                      Option ... _options) {
+        messageTransfer(destination, acceptMode, acquireMode, header,
+                        ByteBuffer.wrap(body), _options);
+    }
+
+    public final void messageTransfer(String destination,
+                                      MessageAcceptMode acceptMode,
+                                      MessageAcquireMode acquireMode,
+                                      Header header,
+                                      String body,
+                                      Option ... _options) {
+        messageTransfer(destination, acceptMode, acquireMode, header,
+                        toUTF8(body), _options);
+    }
+
     public void close()
     {
         sessionRequestTimeout(0);

Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java?rev=682887&r1=682886&r2=682887&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java Tue Aug  5 12:33:11 2008
@@ -48,10 +48,6 @@
         }
     }
 
-    public void header(Session ssn, Header header) { }
-
-    public void data(Session ssn, Data data) { }
-
     public void error(Session ssn, ProtocolError error) { }
 
     @Override public void executionResult(Session ssn, ExecutionResult result)

Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/BBEncoder.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/BBEncoder.java?rev=682887&r1=682886&r2=682887&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/BBEncoder.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/BBEncoder.java Tue Aug  5 12:33:11 2008
@@ -35,24 +35,29 @@
 {
 
     private ByteBuffer out;
+    private int segment;
 
     public BBEncoder(int capacity) {
         out = ByteBuffer.allocate(capacity);
         out.order(ByteOrder.BIG_ENDIAN);
+        segment = 0;
     }
 
     public void init()
     {
         out.clear();
+        segment = 0;
     }
 
-    public ByteBuffer done()
+    public ByteBuffer segment()
     {
-        out.flip();
-        ByteBuffer encoded = ByteBuffer.allocate(out.remaining());
-        encoded.put(out);
-        encoded.flip();
-        return encoded;
+        int pos = out.position();
+        out.position(segment);
+        ByteBuffer slice = out.slice();
+        slice.limit(pos - segment);
+        out.position(pos);
+        segment = pos;
+        return slice;
     }
 
     private void grow(int size)

Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/Validator.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/Validator.java?rev=682887&r1=682886&r2=682887&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/Validator.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/codec/Validator.java Tue Aug  5 12:33:11 2008
@@ -154,7 +154,7 @@
 
     public static final void checkMap(Map<String,Object> map)
     {
-        if (map == null)
+        if (map == null || map.isEmpty())
         {
             return;
         }

Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Assembler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Assembler.java?rev=682887&r1=682886&r2=682887&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Assembler.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Assembler.java Tue Aug  5 12:33:11 2008
@@ -30,7 +30,6 @@
 import org.apache.qpid.transport.codec.BBDecoder;
 import org.apache.qpid.transport.codec.Decoder;
 
-import org.apache.qpid.transport.Data;
 import org.apache.qpid.transport.Header;
 import org.apache.qpid.transport.Method;
 import org.apache.qpid.transport.ProtocolError;
@@ -51,6 +50,7 @@
 
     private final Receiver<ProtocolEvent> receiver;
     private final Map<Integer,List<Frame>> segments;
+    private final Method[] incomplete;
     private final ThreadLocal<BBDecoder> decoder = new ThreadLocal<BBDecoder>()
     {
         public BBDecoder initialValue()
@@ -63,6 +63,7 @@
     {
         this.receiver = receiver;
         segments = new HashMap<Integer,List<Frame>>();
+        incomplete = new Method[64*1024];
     }
 
     private int segmentKey(Frame frame)
@@ -97,11 +98,6 @@
         receiver.received(event);
     }
 
-    private void emit(Frame frame, ProtocolEvent event)
-    {
-        emit(frame.getChannel(), event);
-    }
-
     public void received(NetworkEvent event)
     {
         event.delegate(this);
@@ -122,32 +118,18 @@
         emit(0, header);
     }
 
-    public void frame(Frame frame)
-    {
-        switch (frame.getType())
-        {
-        case BODY:
-            emit(frame, new Data(frame.getBody(), frame.isFirstFrame(),
-                                 frame.isLastFrame()));
-            break;
-        default:
-            assemble(frame);
-            break;
-        }
-    }
-
     public void error(ProtocolError error)
     {
         emit(0, error);
     }
 
-    private void assemble(Frame frame)
+    public void frame(Frame frame)
     {
         ByteBuffer segment;
         if (frame.isFirstFrame() && frame.isLastFrame())
         {
             segment = frame.getBody();
-            emit(frame, decode(frame, segment));
+            assemble(frame, segment);
         }
         else
         {
@@ -179,38 +161,63 @@
                     segment.put(f.getBody());
                 }
                 segment.flip();
-                emit(frame, decode(frame, segment));
+                assemble(frame, segment);
             }
         }
 
     }
 
-    private ProtocolEvent decode(Frame frame, ByteBuffer segment)
+    private void assemble(Frame frame, ByteBuffer segment)
     {
         BBDecoder dec = decoder.get();
         dec.init(segment);
 
+        int channel = frame.getChannel();
+        Method command;
+
         switch (frame.getType())
         {
         case CONTROL:
             int controlType = dec.readUint16();
             Method control = Method.create(controlType);
             control.read(dec);
-            return control;
+            emit(channel, control);
+            break;
         case COMMAND:
             int commandType = dec.readUint16();
             // read in the session header, right now we don't use it
             dec.readUint16();
-            Method command = Method.create(commandType);
+            command = Method.create(commandType);
             command.read(dec);
-            return command;
+            if (command.hasPayload())
+            {
+                incomplete[channel] = command;
+            }
+            else
+            {
+                emit(channel, command);
+            }
+            break;
         case HEADER:
+            command = incomplete[channel];
             List<Struct> structs = new ArrayList();
             while (dec.hasRemaining())
             {
                 structs.add(dec.readStruct32());
             }
-            return new Header(structs, frame.isLastFrame() && frame.isLastSegment());
+            command.setHeader(new Header(structs));
+            if (frame.isLastSegment())
+            {
+                incomplete[channel] = null;
+                emit(channel, command);
+            }
+            break;
+        case BODY:
+            command = incomplete[channel];
+            command.setBody(segment);
+            incomplete[channel] = null;
+            emit(channel, command);
+            break;
         default:
             throw new IllegalStateException("unknown frame type: " + frame.getType());
         }

Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java?rev=682887&r1=682886&r2=682887&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java Tue Aug  5 12:33:11 2008
@@ -22,7 +22,6 @@
 
 import org.apache.qpid.transport.codec.BBEncoder;
 
-import org.apache.qpid.transport.Data;
 import org.apache.qpid.transport.Header;
 import org.apache.qpid.transport.Method;
 import org.apache.qpid.transport.ProtocolDelegate;
@@ -34,7 +33,8 @@
 import org.apache.qpid.transport.Struct;
 
 import java.nio.ByteBuffer;
-import java.util.Iterator;
+import java.nio.ByteOrder;
+import java.util.List;
 
 import static org.apache.qpid.transport.network.Frame.*;
 
@@ -46,12 +46,14 @@
  *
  */
 
-public class Disassembler implements Sender<ProtocolEvent>,
-                                     ProtocolDelegate<Void>
+public final class Disassembler implements Sender<ProtocolEvent>,
+                                           ProtocolDelegate<Void>
 {
 
-    private final Sender<NetworkEvent> sender;
+    private final Sender<ByteBuffer> sender;
     private final int maxPayload;
+    private final ByteBuffer header;
+    private final Object sendlock = new Object();
     private final ThreadLocal<BBEncoder> encoder = new ThreadLocal()
     {
         public BBEncoder initialValue()
@@ -60,7 +62,7 @@
         }
     };
 
-    public Disassembler(Sender<NetworkEvent> sender, int maxFrame)
+    public Disassembler(Sender<ByteBuffer> sender, int maxFrame)
     {
         if (maxFrame <= HEADER_SIZE || maxFrame >= 64*1024)
         {
@@ -69,6 +71,8 @@
         }
         this.sender = sender;
         this.maxPayload  = maxFrame - HEADER_SIZE;
+        this.header =  ByteBuffer.allocate(HEADER_SIZE);
+        this.header.order(ByteOrder.BIG_ENDIAN);
 
     }
 
@@ -79,60 +83,80 @@
 
     public void flush()
     {
-        sender.flush();
+        synchronized (sendlock)
+        {
+            sender.flush();
+        }
     }
 
     public void close()
     {
-        sender.close();
+        synchronized (sendlock)
+        {
+            sender.close();
+        }
+    }
+
+    private final void frame(byte flags, byte type, byte track, int channel, int size, ByteBuffer buf)
+    {
+        synchronized (sendlock)
+        {
+            header.put(0, flags);
+            header.put(1, type);
+            header.putShort(2, (short) (size + HEADER_SIZE));
+            header.put(5, track);
+            header.putShort(6, (short) channel);
+
+            header.rewind();
+
+            sender.send(header);
+
+            int limit = buf.limit();
+            buf.limit(buf.position() + size);
+            sender.send(buf);
+            buf.limit(limit);
+        }
     }
 
     private void fragment(byte flags, SegmentType type, ProtocolEvent event,
                           ByteBuffer buf, boolean first, boolean last)
     {
+        byte typeb = (byte) type.getValue();
         byte track = event.getEncodedTrack() == Frame.L4 ? (byte) 1 : (byte) 0;
 
-        if(!buf.hasRemaining())
+        int remaining = buf.remaining();
+        while (true)
         {
-            //empty data
-            byte nflags = flags;
+            int size = min(maxPayload, remaining);
+            remaining -= size;
+
+            byte newflags = flags;
             if (first)
             {
-                nflags |= FIRST_FRAME;
+                newflags |= FIRST_FRAME;
                 first = false;
             }
-            nflags |= LAST_FRAME;
-            Frame frame = new Frame(nflags, type, track, event.getChannel(), buf.slice());
-            sender.send(frame);
-        }
-        else
-        {
-            while (buf.hasRemaining())
-            {
-                ByteBuffer slice = buf.slice();
-                slice.limit(min(maxPayload, slice.remaining()));
-                buf.position(buf.position() + slice.remaining());
-
-                byte newflags = flags;
-                if (first)
-                {
-                    newflags |= FIRST_FRAME;
-                    first = false;
-                }
-                if (last && !buf.hasRemaining())
-                {
-                    newflags |= LAST_FRAME;
-                }
+            if (last && remaining == 0)
+            {
+                newflags |= LAST_FRAME;
+            }
+
+            frame(newflags, typeb, track, event.getChannel(), size, buf);
 
-                Frame frame = new Frame(newflags, type, track, event.getChannel(), slice);
-                sender.send(frame);
+            if (remaining == 0)
+            {
+                break;
             }
         }
     }
 
     public void init(Void v, ProtocolHeader header)
     {
-        sender.send(header);
+        synchronized (sendlock)
+        {
+            sender.send(header.toByteBuffer());
+            sender.flush();
+        }
     }
 
     public void control(Void v, Method method)
@@ -170,48 +194,43 @@
             }
         }
         method.write(enc);
-        ByteBuffer buf = enc.done();
+        ByteBuffer methodSeg = enc.segment();
 
         byte flags = FIRST_SEG;
 
-        if (!method.hasPayload())
+        boolean payload = method.hasPayload();
+        if (!payload)
         {
             flags |= LAST_SEG;
         }
 
-        fragment(flags, type, method, buf, true, true);
-    }
-
-    public void header(Void v, Header header)
-    {
-        ByteBuffer buf;
-        if (header.getBuf() == null)
+        ByteBuffer headerSeg = null;
+        if (payload)
         {
-            BBEncoder enc = encoder.get();
-            enc.init();
-            for (Struct st : header.getStructs())
+            final Header hdr = method.getHeader();
+            final List<Struct> structs = hdr.getStructs();
+            final int nstructs = structs.size();
+            for (int i = 0; i < nstructs; i++)
             {
-                enc.writeStruct32(st);
+                enc.writeStruct32(structs.get(i));
             }
-            buf = enc.done();
-            header.setBuf(buf);
+            headerSeg = enc.segment();
         }
-        else
+
+        synchronized (sendlock)
         {
-            buf = header.getBuf();
-            buf.flip();
+            fragment(flags, type, method, methodSeg, true, true);
+            if (payload)
+            {
+                fragment((byte) 0x0, SegmentType.HEADER, method, headerSeg, true, true);
+                fragment(LAST_SEG, SegmentType.BODY, method, method.getBody(), true, true);
+            }
         }
-        fragment((byte) 0x0, SegmentType.HEADER, header, buf, true, true);
-    }
-
-    public void data(Void v, Data data)
-    {
-        fragment(LAST_SEG, SegmentType.BODY, data, data.getData(), data.isFirst(), data.isLast());
     }
 
     public void error(Void v, ProtocolError error)
     {
-        sender.send(error);
+        throw new IllegalArgumentException("" + error);
     }
 
 }

Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java?rev=682887&r1=682886&r2=682887&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java Tue Aug  5 12:33:11 2008
@@ -23,7 +23,6 @@
 import java.net.Socket;
 import java.nio.ByteBuffer;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.qpid.transport.Sender;
 import org.apache.qpid.transport.TransportException;
@@ -48,8 +47,9 @@
     private final OutputStream out;
 
     private final byte[] buffer;
-    private final AtomicInteger head = new AtomicInteger(START);
-    private final AtomicInteger tail = new AtomicInteger(START);
+    private volatile int head = START;
+    private volatile int tail = START;
+    private volatile boolean idle = true;
     private final Object notFull = new Object();
     private final Object notEmpty = new Object();
     private final AtomicBoolean closed = new AtomicBoolean(false);
@@ -96,16 +96,17 @@
 
         while (remaining > 0)
         {
-            final int hd = head.get();
-            final int tl = tail.get();
+            final int hd = head;
+            final int tl = tail;
 
             if (hd - tl >= size)
             {
+                flush();
                 synchronized (notFull)
                 {
                     long start = System.currentTimeMillis();
                     long elapsed = 0;
-                    while (head.get() - tail.get() >= size && elapsed < timeout)
+                    while (head - tail >= size && elapsed < timeout)
                     {
                         try
                         {
@@ -118,9 +119,9 @@
                         elapsed += System.currentTimeMillis() - start;
                     }
 
-                    if (head.get() - tail.get() >= size)
+                    if (head - tail >= size)
                     {
-                        throw new TransportException(String.format("write timed out: %s, %s", head.get(), tail.get()));
+                        throw new TransportException(String.format("write timed out: %s, %s", head, tail));
                     }
                 }
                 continue;
@@ -140,21 +141,20 @@
             }
 
             buf.get(buffer, hd_idx, length);
-            head.getAndAdd(length);
-            if (hd == tail.get())
-            {
-                synchronized (notEmpty)
-                {
-                    notEmpty.notify();
-                }
-            }
+            head += length;
             remaining -= length;
         }
     }
 
     public void flush()
     {
-        // pass
+        if (idle)
+        {
+            synchronized (notEmpty)
+            {
+                notEmpty.notify();
+            }
+        }
     }
 
     public void close()
@@ -206,8 +206,8 @@
 
         while (true)
         {
-            final int hd = head.get();
-            final int tl = tail.get();
+            final int hd = head;
+            final int tl = tail;
 
             if (hd == tl)
             {
@@ -216,9 +216,11 @@
                     break;
                 }
 
+                idle = true;
+
                 synchronized (notEmpty)
                 {
-                    while (head.get() == tail.get() && !closed.get())
+                    while (head == tail && !closed.get())
                     {
                         try
                         {
@@ -231,6 +233,8 @@
                     }
                 }
 
+                idle = false;
+
                 continue;
             }
 
@@ -258,8 +262,8 @@
                 close(false);
                 break;
             }
-            tail.getAndAdd(length);
-            if (head.get() - tl >= size)
+            tail += length;
+            if (head - tl >= size)
             {
                 synchronized (notFull)
                 {

Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoTransport.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoTransport.java?rev=682887&r1=682886&r2=682887&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoTransport.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoTransport.java Tue Aug  5 12:33:11 2008
@@ -33,7 +33,6 @@
 import org.apache.qpid.transport.network.Assembler;
 import org.apache.qpid.transport.network.Disassembler;
 import org.apache.qpid.transport.network.InputHandler;
-import org.apache.qpid.transport.network.OutputHandler;
 import org.apache.qpid.transport.util.Logger;
 
 /**
@@ -48,6 +47,14 @@
 public final class IoTransport
 {
 
+    static
+    {
+        org.apache.mina.common.ByteBuffer.setAllocator
+            (new org.apache.mina.common.SimpleByteBufferAllocator());
+        org.apache.mina.common.ByteBuffer.setUseDirectBuffers
+            (Boolean.getBoolean("amqj.enableDirectBuffers"));
+    }
+
     private static final Logger log = Logger.get(IoTransport.class);
 
     private static int DEFAULT_READ_WRITE_BUFFER_SIZE = 64 * 1024;
@@ -104,8 +111,7 @@
 
         sender = new IoSender(this, 2*writeBufferSize, timeout);
         Connection conn = new Connection
-            (new Disassembler(new OutputHandler(sender), 64*1024 - 1),
-             delegate);
+            (new Disassembler(sender, 64*1024 - 1), delegate);
         receiver = new IoReceiver(this, new InputHandler(new Assembler(conn)),
                                   2*readBufferSize, timeout);
 

Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaHandler.java?rev=682887&r1=682886&r2=682887&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaHandler.java Tue Aug  5 12:33:11 2008
@@ -44,7 +44,6 @@
 import org.apache.qpid.transport.network.Assembler;
 import org.apache.qpid.transport.network.Disassembler;
 import org.apache.qpid.transport.network.InputHandler;
-import org.apache.qpid.transport.network.OutputHandler;
 
 import static org.apache.qpid.transport.util.Functions.*;
 
@@ -292,7 +291,7 @@
         {
             // XXX: hardcoded max-frame
             return new Connection
-                (new Disassembler(new OutputHandler(sender), MAX_FRAME_SIZE), delegate);
+                (new Disassembler(sender, MAX_FRAME_SIZE), delegate);
         }
 
         public Receiver<java.nio.ByteBuffer> receiver(Connection conn)

Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/nio/NioHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/nio/NioHandler.java?rev=682887&r1=682886&r2=682887&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/nio/NioHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/nio/NioHandler.java Tue Aug  5 12:33:11 2008
@@ -17,7 +17,6 @@
 import org.apache.qpid.transport.network.Assembler;
 import org.apache.qpid.transport.network.Disassembler;
 import org.apache.qpid.transport.network.InputHandler;
-import org.apache.qpid.transport.network.OutputHandler;
 
 public class NioHandler implements Runnable
 {
@@ -68,8 +67,7 @@
 
         NioSender sender = new NioSender(_ch);
         Connection con = new Connection
-            (new Disassembler(new OutputHandler(sender), 64*1024 - 1),
-             delegate);
+            (new Disassembler(sender, 64*1024 - 1), delegate);
 
         con.setConnectionId(_count.incrementAndGet());
         _handlers.put(con.getConnectionId(),sender);