You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2008/07/09 15:26:56 UTC

svn commit: r675165 [1/2] - in /incubator/qpid/trunk/qpid/java: client/example/src/main/java/org/apache/qpid/example/amqpexample/pubsub/ client/src/main/java/org/apache/qpid/client/ client/src/main/java/org/apache/qpidity/nclient/ client/src/main/java/...

Author: rhs
Date: Wed Jul  9 06:26:54 2008
New Revision: 675165

URL: http://svn.apache.org/viewvc?rev=675165&view=rev
Log:
Primarily profiling driven changes:

 - added batched writes of commands/controls issued on a session

 - copy fragmented frames and segments rather than trying to decode
   them piecemeal, removed FragmentDecoder

 - added caching for str8 encode/decode

 - compute sizes as we encode by going back and filling in the amount
   of bytes written rather than computing it up front

 - added SYNC option to commands

 - renamed NO_OPTION argument to NONE

 - added a timeout to Client.java

 - removed use of UUID.fromString in BasicMessageProducer_0_10.java

Added:
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Binary.java   (with props)
Removed:
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/codec/FragmentDecoder.java
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/codec/SizeEncoder.java
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/codec/Sizer.java
Modified:
    incubator/qpid/trunk/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/pubsub/TopicListener.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/XAResourceImpl.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Client.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/DtxSession.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Session.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSessionDelegate.java
    incubator/qpid/trunk/qpid/java/common/Composite.tpl
    incubator/qpid/trunk/qpid/java/common/Invoker.tpl
    incubator/qpid/trunk/qpid/java/common/Option.tpl
    incubator/qpid/trunk/qpid/java/common/genutil.py
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/ConsoleOutput.java
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/ToyBroker.java
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Channel.java
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Connection.java
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Data.java
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Echo.java
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Method.java
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Sender.java
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Session.java
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/codec/AbstractDecoder.java
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/codec/AbstractEncoder.java
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/codec/BBDecoder.java
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/codec/BBEncoder.java
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/Assembler.java
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/Disassembler.java
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/Frame.java
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/InputHandler.java
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/OutputHandler.java
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/io/IoSender.java
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/mina/MinaSender.java
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/nio/NioSender.java

Modified: incubator/qpid/trunk/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/pubsub/TopicListener.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/pubsub/TopicListener.java?rev=675165&r1=675164&r2=675165&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/pubsub/TopicListener.java (original)
+++ incubator/qpid/trunk/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/pubsub/TopicListener.java Wed Jul  9 06:26:54 2008
@@ -55,7 +55,7 @@
                                  Session.TRANSFER_CONFIRM_MODE_NOT_REQUIRED,
                                  Session.TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE,
                                  new MessagePartListenerAdapter(this),
-                                 null, Option.NO_OPTION);
+                                 null, Option.NONE);
         // issue credits
         // XXX: need to be able to set to null
         session.messageFlow(queueName, MessageCreditUnit.BYTE, Session.MESSAGE_FLOW_MAX_BYTES);

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java?rev=675165&r1=675164&r2=675165&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java Wed Jul  9 06:26:54 2008
@@ -250,9 +250,9 @@
     public void sendCreateQueue(AMQShortString name, final boolean autoDelete, final boolean durable,
                                 final boolean exclusive, Map<String, Object> arguments) throws AMQException, FailoverException
     {
-        getQpidSession().queueDeclare(name.toString(), null, arguments, durable ? Option.DURABLE : Option.NO_OPTION,
-                                      autoDelete ? Option.AUTO_DELETE : Option.NO_OPTION,
-                                      exclusive ? Option.EXCLUSIVE : Option.NO_OPTION);
+        getQpidSession().queueDeclare(name.toString(), null, arguments, durable ? Option.DURABLE : Option.NONE,
+                                      autoDelete ? Option.AUTO_DELETE : Option.NONE,
+                                      exclusive ? Option.EXCLUSIVE : Option.NONE);
         // We need to sync so that we get notify of an error.
         getQpidSession().sync();
         getCurrentException();
@@ -387,7 +387,7 @@
                                               getAcknowledgeMode() == NO_ACKNOWLEDGE ? Session.TRANSFER_CONFIRM_MODE_NOT_REQUIRED:Session.TRANSFER_CONFIRM_MODE_REQUIRED,
                                               preAcquire ? Session.TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE : Session.TRANSFER_ACQUIRE_MODE_NO_ACQUIRE,
                                               new MessagePartListenerAdapter((BasicMessageConsumer_0_10) consumer), null,
-                                              consumer.isExclusive() ? Option.EXCLUSIVE : Option.NO_OPTION);
+                                              consumer.isExclusive() ? Option.EXCLUSIVE : Option.NONE);
         }
         catch (JMSException e)
         {
@@ -477,9 +477,9 @@
             arguments.put("no-local", true);
         }
         getQpidSession().queueDeclare(res.toString(), null, arguments,
-                                      amqd.isAutoDelete() ? Option.AUTO_DELETE : Option.NO_OPTION,
-                                      amqd.isDurable() ? Option.DURABLE : Option.NO_OPTION,
-                                      !amqd.isDurable() && amqd.isExclusive() ? Option.EXCLUSIVE : Option.NO_OPTION);
+                                      amqd.isAutoDelete() ? Option.AUTO_DELETE : Option.NONE,
+                                      amqd.isDurable() ? Option.DURABLE : Option.NONE,
+                                      !amqd.isDurable() && amqd.isExclusive() ? Option.EXCLUSIVE : Option.NONE);
         // passive --> false
         // We need to sync so that we get notify of an error.
         getQpidSession().sync();

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java?rev=675165&r1=675164&r2=675165&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java Wed Jul  9 06:26:54 2008
@@ -453,19 +453,21 @@
             }
         }
 
+        UUID messageId = null;
         if (_disableMessageId)
         {
             message.setJMSMessageID(null);
         }
         else
         {
+            messageId = UUID.randomUUID();
             StringBuilder b = new StringBuilder(39);
             b.append("ID:");
-            b.append(UUID.randomUUID());
+            b.append(messageId);
             message.setJMSMessageID(b.toString());
         }
 
-        sendMessage(destination, origMessage, message, deliveryMode, priority, timeToLive, mandatory, immediate, wait);
+        sendMessage(destination, origMessage, message, messageId, deliveryMode, priority, timeToLive, mandatory, immediate, wait);
 
         if (message != origMessage)
         {
@@ -484,8 +486,8 @@
     }
 
     abstract void sendMessage(AMQDestination destination, Message origMessage, AbstractJMSMessage message,
-                              int deliveryMode, int priority, long timeToLive, boolean mandatory,
-                              boolean immediate, boolean wait)throws JMSException;
+                              UUID messageId, int deliveryMode, int priority, long timeToLive, boolean mandatory,
+                              boolean immediate, boolean wait) throws JMSException;
 
     private void checkTemporaryDestination(AMQDestination destination) throws JMSException
     {

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java?rev=675165&r1=675164&r2=675165&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java Wed Jul  9 06:26:54 2008
@@ -68,8 +68,8 @@
      * Sends a message to a given destination
      */
     void sendMessage(AMQDestination destination, Message origMessage, AbstractJMSMessage message,
-                     int deliveryMode, int priority, long timeToLive, boolean mandatory, boolean immediate,
-                     boolean wait) throws JMSException
+                     UUID messageId, int deliveryMode, int priority, long timeToLive, boolean mandatory,
+                     boolean immediate, boolean wait) throws JMSException
     {
         message.prepareForSending();
         if (message.get010Message() == null)
@@ -84,7 +84,16 @@
 
         DeliveryProperties deliveryProp = message.get010Message().getDeliveryProperties();
         MessageProperties messageProps = message.get010Message().getMessageProperties();
-        // set the delivery properties
+
+        if (messageId != null)
+        {
+            messageProps.setMessageId(messageId);
+        }
+        else if (messageProps.hasMessageId())
+        {
+            messageProps.clearMessageId();
+        }
+
         if (!_disableTimestamps)
         {
             final long currentTime = System.currentTimeMillis();
@@ -142,13 +151,6 @@
             messageProps.setContentType(contentHeaderProperties.getContentType().toString());
             messageProps.setContentLength(message.getContentLength());
 
-            // XXX: fixme
-            String mid = message.getJMSMessageID();
-            if( mid != null )
-            {
-                messageProps.setMessageId(UUID.fromString(mid.substring(3)));
-            }
-
             AMQShortString correlationID = contentHeaderProperties.getCorrelationId();
             if (correlationID != null)
             {

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java?rev=675165&r1=675164&r2=675165&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java Wed Jul  9 06:26:54 2008
@@ -20,6 +20,8 @@
  */
 package org.apache.qpid.client;
 
+import java.util.UUID;
+
 import javax.jms.JMSException;
 import javax.jms.Message;
 
@@ -65,9 +67,9 @@
         _protocolHandler.writeFrame(declare);
     }
 
-    void sendMessage(AMQDestination destination, Message origMessage,AbstractJMSMessage message,
-                     int deliveryMode,int priority, long timeToLive, boolean mandatory, boolean immediate,
-                     boolean wait) throws JMSException
+    void sendMessage(AMQDestination destination, Message origMessage, AbstractJMSMessage message,
+                     UUID messageId, int deliveryMode,int priority, long timeToLive, boolean mandatory,
+                     boolean immediate, boolean wait) throws JMSException
     {
         BasicPublishBody body = getSession().getMethodRegistry().createBasicPublishBody(_session.getTicket(),
                                                                                         destination.getExchangeName(),

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/XAResourceImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/XAResourceImpl.java?rev=675165&r1=675164&r2=675165&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/XAResourceImpl.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/XAResourceImpl.java Wed Jul  9 06:26:54 2008
@@ -76,7 +76,7 @@
             _logger.debug("commit tx branch with xid:  ", xid);
         }
         Future<XaResult> future =
-                _xaSession.getQpidSession().dtxCommit(convertXid(xid), b ? Option.ONE_PHASE : Option.NO_OPTION);
+                _xaSession.getQpidSession().dtxCommit(convertXid(xid), b ? Option.ONE_PHASE : Option.NONE);
 
         // now wait on the future for the result
         XaResult result = null;
@@ -129,8 +129,8 @@
         }
         Future<XaResult> future = _xaSession.getQpidSession()
                 .dtxEnd(convertXid(xid),
-                        flag == XAResource.TMFAIL ? Option.FAIL : Option.NO_OPTION,
-                        flag == XAResource.TMSUSPEND ? Option.SUSPEND : Option.NO_OPTION);
+                        flag == XAResource.TMFAIL ? Option.FAIL : Option.NONE,
+                        flag == XAResource.TMSUSPEND ? Option.SUSPEND : Option.NONE);
         // now wait on the future for the result
         XaResult result = null;
         try
@@ -400,8 +400,8 @@
         }
         Future<XaResult> future = _xaSession.getQpidSession()
                 .dtxStart(convertXid(xid),
-                        flag == XAResource.TMJOIN ? Option.JOIN : Option.NO_OPTION,
-                        flag == XAResource.TMRESUME ? Option.RESUME : Option.NO_OPTION);
+                        flag == XAResource.TMJOIN ? Option.JOIN : Option.NONE,
+                        flag == XAResource.TMRESUME ? Option.RESUME : Option.NONE);
         // now wait on the future for the result
         XaResult result = null;
         try

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Client.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Client.java?rev=675165&r1=675164&r2=675165&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Client.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Client.java Wed Jul  9 06:26:54 2008
@@ -25,7 +25,6 @@
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
-import java.util.concurrent.TimeUnit;
 
 import org.apache.qpid.client.url.URLParser_0_10;
 import org.apache.qpid.jms.BrokerDetails;
@@ -60,6 +59,7 @@
     private static Logger _logger = LoggerFactory.getLogger(Client.class);
     private Condition closeOk;
     private boolean closed = false;
+    private long timeout = 60000;
 
     /**
      *
@@ -191,7 +191,7 @@
 
         try
         {
-            negotiationComplete.await();
+            negotiationComplete.await(timeout, TimeUnit.MILLISECONDS);
             if( connectionDelegate.getUnsupportedProtocol() != null )
             {
                 _conn.close();
@@ -202,7 +202,7 @@
         }
         catch (InterruptedException e)
         {
-            //
+            throw new RuntimeException(e);
         }
         finally
         {
@@ -257,7 +257,6 @@
         {
             try
             {
-                long timeout = 60000;
                 long start = System.currentTimeMillis();
                 long elapsed = 0;
                 while (!closed && elapsed < timeout)

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/DtxSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/DtxSession.java?rev=675165&r1=675164&r2=675165&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/DtxSession.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/DtxSession.java Wed Jul  9 06:26:54 2008
@@ -83,7 +83,7 @@
      *
      * @param xid Specifies the xid of the transaction branch to be forgotten.
      */
-    public void dtxForget(Xid xid);
+    public void dtxForget(Xid xid, Option ... options);
 
     /**
      * This method obtains the current transaction timeout value in seconds. If set-timeout was not
@@ -93,7 +93,7 @@
      * @param xid Specifies the xid of the transaction branch used for getting the timeout.
      * @return The current transaction timeout value in seconds.
      */
-    public Future<GetTimeoutResult> dtxGetTimeout(Xid xid);
+    public Future<GetTimeoutResult> dtxGetTimeout(Xid xid, Option ... options);
 
     /**
      * This method prepares any message produced or consumed on behalf of xid, ready for commitment.
@@ -109,14 +109,14 @@
      *         <p/>
      *         xa-rbtimeout: The work represented by this transaction branch took too long.
      */
-    public Future<XaResult> dtxPrepare(Xid xid);
+    public Future<XaResult> dtxPrepare(Xid xid, Option ... options);
 
     /**
      * This method is called to obtain a list of transaction branches that are in a prepared or
      * heuristically completed state.
      * @return a array of xids to be recovered.
      */
-    public Future<RecoverResult> dtxRecover();
+    public Future<RecoverResult> dtxRecover(Option ... options);
 
     /**
      * This method rolls back the work associated with xid. Any produced messages are discarded and
@@ -125,7 +125,7 @@
      * @param xid Specifies the xid of the transaction branch to be rolled back.
      * @return Confirms to the client that the transaction branch is rolled back or specifies the error condition.
      */
-    public Future<XaResult> dtxRollback(Xid xid);
+    public Future<XaResult> dtxRollback(Xid xid, Option ... options);
 
     /**
      * Sets the specified transaction branch timeout value in seconds.
@@ -133,5 +133,5 @@
      * @param xid     Specifies the xid of the transaction branch for setting the timeout.
      * @param timeout The transaction timeout value in seconds.
      */
-    public void dtxSetTimeout(Xid xid, long timeout);
+    public void dtxSetTimeout(Xid xid, long timeout, Option ... options);
 }

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Session.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Session.java?rev=675165&r1=675164&r2=675165&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Session.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Session.java Wed Jul  9 06:26:54 2008
@@ -65,9 +65,9 @@
 
     public void close();
 
-    public void sessionDetach(byte[] name);
+    public void sessionDetach(byte[] name, Option ... options);
 
-    public void sessionRequestTimeout(long expiry);
+    public void sessionRequestTimeout(long expiry, Option ... options);
 
     public byte[] getName();
 
@@ -153,7 +153,8 @@
      * 
      * @param acquireMode Indicates whether or not the transferred message has been acquired.
      */
-    public void messageTransfer(String destination, MessageAcceptMode acceptMode, MessageAcquireMode acquireMode);
+    public void messageTransfer(String destination, MessageAcceptMode acceptMode, MessageAcquireMode acquireMode,
+                                Option ... options);
 
     /**
      * Make a set of headers to be sent together with a message
@@ -207,7 +208,7 @@
      * <ul>
      * <li>{@link Option#EXCLUSIVE}: <p> Requests exclusive subscription access, so that only this
      * subscription can access the queue.
-     * <li>{@link Option#NO_OPTION}: <p> This is an empty option, and has no effect.
+     * <li>{@link Option#NONE}: <p> This is an empty option, and has no effect.
      * </ul>
      *
      * @param queue       The queue that the receiver is receiving messages from.
@@ -230,7 +231,7 @@
      * @param listener    The listener for this destination. To transfer large messages
      *                    use a {@link org.apache.qpidity.nclient.MessagePartListener}.
      * @param options     Set of options. Valid options are {{@link Option#EXCLUSIVE}
-     *                    and {@link Option#NO_OPTION}.
+     *                    and {@link Option#NONE}.
      * @param filter      A set of filters for the subscription. The syntax and semantics of these filters varies
      *                    according to the provider's implementation.
      */
@@ -246,7 +247,7 @@
      *
      * @param destination The destination to be cancelled.
      */
-    public void messageCancel(String destination);
+    public void messageCancel(String destination, Option ... options);
 
     /**
      * Associate a message listener with a destination.
@@ -274,7 +275,7 @@
      * @param mode        <ul> <li>credit ({@link Session#MESSAGE_FLOW_MODE_CREDIT}): choose credit based flow control
      *                    <li> window ({@link Session#MESSAGE_FLOW_MODE_WINDOW}): choose window based flow control</ul>
      */
-    public void messageSetFlowMode(String destination, MessageFlowMode mode);
+    public void messageSetFlowMode(String destination, MessageFlowMode mode, Option ... options);
 
 
     /**
@@ -295,7 +296,7 @@
      *                    </ul>
      * @param value       Number of credits, a value of 0 indicates an infinite amount of credit.
      */
-    public void messageFlow(String destination, MessageCreditUnit unit, long value);
+    public void messageFlow(String destination, MessageCreditUnit unit, long value, Option ... options);
 
     /**
      * Forces the broker to exhaust its credit supply.
@@ -304,7 +305,7 @@
      *
      * @param destination The destination on which the credit supply is to be exhausted.
      */
-    public void messageFlush(String destination);
+    public void messageFlush(String destination, Option ... options);
 
     /**
      * On receipt of this method, the brokers set credit to zero for a given
@@ -314,7 +315,7 @@
      *
      * @param destination The destination on which to reset credit.
      */
-    public void messageStop(String destination);
+    public void messageStop(String destination, Option ... options);
 
     /**
      * Acknowledge the receipt of a range of messages.
@@ -338,7 +339,7 @@
      *               failed).
      * @param text   String describing the reason for a message transfer rejection.
      */
-    public void messageReject(RangeSet ranges, MessageRejectCode code, String text);
+    public void messageReject(RangeSet ranges, MessageRejectCode code, String text, Option ... options);
 
     /**
      * As it is possible that the broker does not manage to reject some messages, after completion of
@@ -367,7 +368,7 @@
      * @param ranges Ranges of messages to be acquired.
      * @return Indicates the acquired messages
      */
-    public Future<Acquired> messageAcquire(RangeSet ranges);
+    public Future<Acquired> messageAcquire(RangeSet ranges, Option ... options);
 
     /**
      * Give up responsibility for processing ranges of messages.
@@ -384,21 +385,21 @@
     /**
      * Selects the session for local transaction support.
      */
-    public void txSelect();
+    public void txSelect(Option ... options);
 
     /**
      * Commit the receipt and delivery of all messages exchanged by this session's resources.
      *
      * @throws IllegalStateException If this session is not transacted, an exception will be thrown.
      */
-    public void txCommit() throws IllegalStateException;
+    public void txCommit(Option ... options) throws IllegalStateException;
 
     /**
      * Roll back the receipt and delivery of all messages exchanged by this session's resources.
      *
      * @throws IllegalStateException If this session is not transacted, an exception will be thrown.
      */
-    public void txRollback() throws IllegalStateException;
+    public void txRollback(Option ... options) throws IllegalStateException;
 
     //---------------------------------------------
     //            Queue methods
@@ -423,7 +424,7 @@
      * declaring connection closes.
      * <li> {@link Option#PASSIVE}: <p> If set, the server will not create the queue.
      * This field allows the client to assert the presence of a queue without modifying the server state.
-     * <li>{@link Option#NO_OPTION}: <p> Has no effect as it represents an �empty� option.
+     * <li>{@link Option#NONE}: <p> Has no effect as it represents an �empty� option.
      * </ul>
      * <p>In the absence of a particular option, the defaul value is false for each option
      *
@@ -435,7 +436,7 @@
      *                          the queue. </ol>
      * @param arguments         Used for backward compatibility
      * @param options           Set of Options ( valide options are: {@link Option#AUTO_DELETE}, {@link Option#DURABLE},
-     *                          {@link Option#EXCLUSIVE}, {@link Option#PASSIVE} and  {@link Option#NO_OPTION})
+     *                          {@link Option#EXCLUSIVE}, {@link Option#PASSIVE} and  {@link Option#NONE})
      * @see Option
      */
     public void queueDeclare(String queueName, String alternateExchange, Map<String, Object> arguments,
@@ -456,7 +457,8 @@
      *                     routing keys depends on the exchange implementation.
      * @param arguments    Used for backward compatibility
      */
-    public void exchangeBind(String queueName, String exchangeName, String routingKey, Map<String, Object> arguments);
+    public void exchangeBind(String queueName, String exchangeName, String routingKey, Map<String, Object> arguments,
+                             Option ... options);
 
     /**
      * Unbind a queue from an exchange.
@@ -465,7 +467,7 @@
      * @param exchangeName The name of the exchange to unbind from.
      * @param routingKey   Specifies the routing key of the binding to unbind.
      */
-    public void exchangeUnbind(String queueName, String exchangeName, String routingKey);
+    public void exchangeUnbind(String queueName, String exchangeName, String routingKey, Option ... options);
 
     /**
      * This method removes all messages from a queue. It does not cancel consumers. Purged messages
@@ -474,7 +476,7 @@
      * @param queueName Specifies the name of the queue to purge. If the queue name is empty, refers to the
      *                  current queue for the session, which is the last declared queue.
      */
-    public void queuePurge(String queueName);
+    public void queuePurge(String queueName, Option ... options);
 
     /**
      * This method deletes a queue. When a queue is deleted any pending messages are sent to a
@@ -485,7 +487,7 @@
      * <li> {@link Option#IF_EMPTY}: <p>  If set, the server will only delete the queue if it has no messages.
      * <li> {@link Option#IF_UNUSED}: <p> If set, the server will only delete the queue if it has no consumers.
      * If the queue has consumers the server does does not delete it but raises a channel exception instead.
-     * <li>{@link Option#NO_OPTION}: <p> Has no effect as it represents an �empty� option.
+     * <li>{@link Option#NONE}: <p> Has no effect as it represents an �empty� option.
      * </ul>
      * </p>
      * <p/>
@@ -494,7 +496,7 @@
      * @param queueName Specifies the name of the queue to delete. If the queue name is empty, refers to the
      *                  current queue for the session, which is the last declared queue.
      * @param options   Set of options (Valid options are: {@link Option#IF_EMPTY}, {@link Option#IF_UNUSED}
-     *                  and {@link Option#NO_OPTION})
+     *                  and {@link Option#NONE})
      * @see Option
      */
     public void queueDelete(String queueName, Option... options);
@@ -506,7 +508,7 @@
      * @param queueName The name of the queue for which information is requested.
      * @return Information on the specified queue.
      */
-    public Future<QueueQueryResult> queueQuery(String queueName);
+    public Future<QueueQueryResult> queueQuery(String queueName, Option ... options);
 
 
     /**
@@ -519,7 +521,7 @@
      * @return Information on the specified binding.
      */
     public Future<ExchangeBoundResult> exchangeBound(String exchange, String queue, String routingKey,
-                                                     Map<String, Object> arguments);
+                                                     Map<String, Object> arguments, Option ... options);
 
     // --------------------------------------
     //              exhcange methods
@@ -536,7 +538,7 @@
      * exchanges) are purged when a server restarts.
      * <li>{@link Option#PASSIVE}: <p>If set, the server will not create the exchange.
      * The client can use this to check whether an exchange exists without modifying the server state.
-     * <li> {@link Option#NO_OPTION}: <p>This option is an empty option, and has no effect.
+     * <li> {@link Option#NONE}: <p>This option is an empty option, and has no effect.
      * </ul>
      * <p>In the absence of a particular option, the defaul value is false for each option</p>
      *
@@ -548,7 +550,7 @@
      * @param alternateExchange In the event that a message cannot be routed, this is the name of the exchange to which
      *                          the message will be sent.
      * @param options           Set of options (valid options are: {@link Option#AUTO_DELETE}, {@link Option#DURABLE},
-     *                          {@link Option#PASSIVE}, {@link Option#NO_OPTION})
+     *                          {@link Option#PASSIVE}, {@link Option#NONE})
      * @param arguments         Used for backward compatibility
      * @see Option
      */
@@ -563,12 +565,12 @@
      * <li> {@link Option#IF_UNUSED}: <p> If set, the server will only delete the exchange if it has no queue bindings. If the
      * exchange has queue bindings the server does not delete it but raises a channel exception
      * instead.
-     * <li> {@link Option#NO_OPTION}: <p> Has no effect as it represents an empty option.
+     * <li> {@link Option#NONE}: <p> Has no effect as it represents an empty option.
      * </ul>
      * <p>Note that if an option is not set, it will default to false.
      *
      * @param exchangeName The name of exchange to be deleted.
-     * @param options      Set of options. Valid options are:  {@link Option#IF_UNUSED}, {@link Option#NO_OPTION}.
+     * @param options      Set of options. Valid options are:  {@link Option#IF_UNUSED}, {@link Option#NONE}.
      * @see Option
      */
     public void exchangeDelete(String exchangeName, Option... options);
@@ -581,7 +583,7 @@
      *                     return information about the default exchange.
      * @return Information on the specified exchange.
      */
-    public Future<ExchangeQueryResult> exchangeQuery(String exchangeName);
+    public Future<ExchangeQueryResult> exchangeQuery(String exchangeName, Option ... options);
 
     /**
      * If the session receives a sessionClosed with an error code it

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java?rev=675165&r1=675164&r2=675165&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java Wed Jul  9 06:26:54 2008
@@ -17,6 +17,8 @@
 import org.apache.qpidity.transport.Range;
 import org.apache.qpidity.transport.RangeSet;
 
+import static org.apache.qpidity.transport.Option.*;
+
 /**
  * Implements a Qpid Sesion.
  */
@@ -66,8 +68,8 @@
         {
             super.processed(range);
         }
-        super.flushProcessed();
-        if( accept )
+        super.flushProcessed(accept ? BATCH : NONE);
+        if (accept)
         {
             messageAccept(ranges);
         }

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSessionDelegate.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSessionDelegate.java?rev=675165&r1=675164&r2=675165&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSessionDelegate.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSessionDelegate.java Wed Jul  9 06:26:54 2008
@@ -32,15 +32,11 @@
     // --------------------------------------------
     @Override public void data(Session ssn, Data data)
     {
-        for (ByteBuffer b : data.getFragments())
-        {    
-            _currentMessageListener.data(b);
-        }
+        _currentMessageListener.data(data.getData());
         if (data.isLast())
         {
             _currentMessageListener.messageReceived();
         }
-        
     }
 
     @Override public void header(Session ssn, Header header)

Modified: incubator/qpid/trunk/qpid/java/common/Composite.tpl
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/Composite.tpl?rev=675165&r1=675164&r2=675165&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/Composite.tpl (original)
+++ incubator/qpid/trunk/qpid/java/common/Composite.tpl Wed Jul  9 06:26:54 2008
@@ -80,7 +80,7 @@
   out("    private $(PACK_TYPES[pack]) packing_flags = 0;\n");
 
 fields = get_fields(type)
-params = get_parameters(fields)
+params = get_parameters(type, fields)
 options = get_options(fields)
 
 for f in fields:
@@ -99,7 +99,7 @@
   if f.option: continue
   out("        $(f.set)($(f.name));\n")
 
-if options:
+if options or base == "Method":
   out("""
         for (int i=0; i < _options.length; i++) {
             switch (_options[i]) {
@@ -108,7 +108,11 @@
   for f in options:
     out("            case $(f.option): packing_flags |= $(f.flag_mask(pack)); break;\n")
 
-  out("""            case NO_OPTION: break;
+  if base == "Method":
+    out("""            case SYNC: this.setSync(true); break;
+            case BATCH: this.setBatch(true); break;
+""")
+  out("""            case NONE: break;
             default: throw new IllegalArgumentException("invalid option: " + _options[i]);
             }
         }

Modified: incubator/qpid/trunk/qpid/java/common/Invoker.tpl
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/Invoker.tpl?rev=675165&r1=675164&r2=675165&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/Invoker.tpl (original)
+++ incubator/qpid/trunk/qpid/java/common/Invoker.tpl Wed Jul  9 06:26:54 2008
@@ -15,8 +15,8 @@
 for c in composites:
   name = cname(c)
   fields = get_fields(c)
-  params = get_parameters(fields)
-  args = get_arguments(fields)
+  params = get_parameters(c, fields)
+  args = get_arguments(c, fields)
   result = c["result"]
   if result:
     if not result["@type"]:
@@ -32,7 +32,7 @@
     jclass = ""
 
   out("""
-     public $jresult $(dromedary(name))($(", ".join(params))) {
+     public final $jresult $(dromedary(name))($(", ".join(params))) {
          $(jreturn)invoke(new $name($(", ".join(args)))$jclass);
      }
 """)

Modified: incubator/qpid/trunk/qpid/java/common/Option.tpl
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/Option.tpl?rev=675165&r1=675164&r2=675165&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/Option.tpl (original)
+++ incubator/qpid/trunk/qpid/java/common/Option.tpl Wed Jul  9 06:26:54 2008
@@ -15,5 +15,6 @@
       if not options.has_key(option):
         options[option] = None
         out("    $option,\n")}
-    NO_OPTION
+    BATCH,
+    NONE
 }

Modified: incubator/qpid/trunk/qpid/java/common/genutil.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/genutil.py?rev=675165&r1=675164&r2=675165&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/genutil.py (original)
+++ incubator/qpid/trunk/qpid/java/common/genutil.py Wed Jul  9 06:26:54 2008
@@ -206,7 +206,7 @@
     index += 1
   return fields
 
-def get_parameters(fields):
+def get_parameters(type, fields):
   params = []
   options = False
   for f in fields:
@@ -214,11 +214,11 @@
       options = True
     else:
       params.append("%s %s" % (f.type, f.name))
-  if options:
+  if options or type.name in ("control", "command"):
     params.append("Option ... _options")
   return params
 
-def get_arguments(fields):
+def get_arguments(type, fields):
   args = []
   options = False
   for f in fields:
@@ -226,7 +226,7 @@
       options = True
     else:
       args.append(f.name)
-  if options:
+  if options or type.name in ("control", "command"):
     args.append("_options")
   return args
 

Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/ConsoleOutput.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/ConsoleOutput.java?rev=675165&r1=675164&r2=675165&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/ConsoleOutput.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/ConsoleOutput.java Wed Jul  9 06:26:54 2008
@@ -41,6 +41,11 @@
         System.out.println(str(buf));
     }
 
+    public void flush()
+    {
+        // pass
+    }
+
     public void close()
     {
         System.out.println("CLOSED");

Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/ToyBroker.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/ToyBroker.java?rev=675165&r1=675164&r2=675165&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/ToyBroker.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/ToyBroker.java Wed Jul  9 06:26:54 2008
@@ -188,10 +188,7 @@
         ssn.header(m.header);
         for (Data d : m.body)
         {
-            for (ByteBuffer b : d.getFragments())
-            {
-                ssn.data(b);
-            }
+            ssn.data(d.getData());
         }
         ssn.endData();
     }
@@ -245,11 +242,8 @@
 
             for (Data d : body)
             {
-                for (ByteBuffer b : d.getFragments())
-                {
-                    sb.append(" | ");
-                    sb.append(str(b));
-                }
+                sb.append(" | ");
+                sb.append(d);
             }
 
             return sb.toString();

Added: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Binary.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Binary.java?rev=675165&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Binary.java (added)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Binary.java Wed Jul  9 06:26:54 2008
@@ -0,0 +1,129 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpidity.transport;
+
+
+/**
+ * Binary
+ *
+ */
+
+public final class Binary
+{
+
+    private byte[] bytes;
+    private int offset;
+    private int size;
+    private int hash = 0;
+
+    public Binary(byte[] bytes, int offset, int size)
+    {
+        if (offset + size > bytes.length)
+        {
+            throw new ArrayIndexOutOfBoundsException();
+        }
+
+        this.bytes = bytes;
+        this.offset = offset;
+        this.size = size;
+    }
+
+    public Binary(byte[] bytes)
+    {
+        this(bytes, 0, bytes.length);
+    }
+
+    public final byte[] array()
+    {
+        return bytes;
+    }
+
+    public final int offset()
+    {
+        return offset;
+    }
+
+    public final int size()
+    {
+        return size;
+    }
+
+    public final Binary slice(int low, int high)
+    {
+        int sz;
+
+        if (high < 0)
+        {
+            sz = size + high;
+        }
+        else
+        {
+            sz = high - low;
+        }
+
+        if (sz < 0)
+        {
+            sz = 0;
+        }
+
+        return new Binary(bytes, offset + low, sz);
+    }
+
+    public final int hashCode()
+    {
+        if (hash == 0)
+        {
+            int hc = 0;
+            for (int i = 0; i < size; i++)
+            {
+                hc = 31*hc + (0xFF & bytes[offset + i]);
+            }
+            hash = hc;
+        }
+
+        return hash;
+    }
+
+    public final boolean equals(Object o)
+    {
+        if (!(o instanceof Binary))
+        {
+            return false;
+        }
+
+        Binary buf = (Binary) o;
+        if (this.size != buf.size)
+        {
+            return false;
+        }
+
+        for (int i = 0; i < size; i++)
+        {
+            if (bytes[offset + i] != buf.bytes[buf.offset + i])
+            {
+                return false;
+            }
+        }
+
+        return true;
+    }
+
+}

Propchange: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Binary.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Channel.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Channel.java?rev=675165&r1=675164&r2=675165&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Channel.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Channel.java Wed Jul  9 06:26:54 2008
@@ -56,6 +56,7 @@
     private Lock commandLock = new ReentrantLock();
     private boolean first = true;
     private ByteBuffer data = null;
+    private boolean batch = false;
 
     public Channel(Connection connection, int channel, SessionDelegate delegate)
     {
@@ -162,6 +163,13 @@
 
         emit(m);
 
+        if (!m.isBatch() && !m.hasPayload())
+        {
+            connection.flush();
+        }
+
+        batch = m.isBatch();
+
         if (m.getEncodedTrack() == Frame.L4 && !m.hasPayload())
         {
             commandLock.unlock();
@@ -199,6 +207,10 @@
         emit(new Data(data, first, true));
         first = true;
         data = null;
+        if (!batch)
+        {
+            connection.flush();
+        }
         commandLock.unlock();
     }
 

Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Connection.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Connection.java?rev=675165&r1=675164&r2=675165&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Connection.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Connection.java Wed Jul  9 06:26:54 2008
@@ -40,7 +40,6 @@
  * short instead of Short
  */
 
-// RA making this public until we sort out the package issues
 public class Connection
     implements Receiver<ConnectionEvent>, Sender<ConnectionEvent>
 {
@@ -90,6 +89,12 @@
         sender.send(event);
     }
 
+    public void flush()
+    {
+        log.debug("FLUSH: [%s]", this);
+        sender.flush();
+    }
+
     public int getChannelMax()
     {
         return channelMax;

Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Data.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Data.java?rev=675165&r1=675164&r2=675165&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Data.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Data.java Wed Jul  9 06:26:54 2008
@@ -37,25 +37,20 @@
 public class Data implements ProtocolEvent
 {
 
-    private final Iterable<ByteBuffer> fragments;
+    private final ByteBuffer data;
     private final boolean first;
     private final boolean last;
 
-    public Data(Iterable<ByteBuffer> fragments, boolean first, boolean last)
+    public Data(ByteBuffer data, boolean first, boolean last)
     {
-        this.fragments = fragments;
+        this.data = data;
         this.first = first;
         this.last = last;
     }
 
-    public Data(ByteBuffer buf, boolean first, boolean last)
+    public ByteBuffer getData()
     {
-        this(Collections.singletonList(buf), first, last);
-    }
-
-    public Iterable<ByteBuffer> getFragments()
-    {
-        return fragments;
+        return data.slice();
     }
 
     public boolean isFirst()
@@ -82,25 +77,7 @@
     {
         StringBuffer str = new StringBuffer();
         str.append("Data(");
-        boolean first = true;
-        int left = 64;
-        for (ByteBuffer buf : getFragments())
-        {
-            if (first)
-            {
-                first = false;
-            }
-            else
-            {
-                str.append(" | ");
-            }
-            str.append(str(buf, left));
-            left -= buf.remaining();
-            if (left < 0)
-            {
-                break;
-            }
-        }
+        str.append(str(data, 64));
         str.append(")");
         return str.toString();
     }

Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Echo.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Echo.java?rev=675165&r1=675164&r2=675165&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Echo.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Echo.java Wed Jul  9 06:26:54 2008
@@ -49,10 +49,7 @@
 
     public void data(Session ssn, Data data)
     {
-        for (ByteBuffer buf : data.getFragments())
-        {
-            ssn.data(buf);
-        }
+        ssn.data(data.getData());
         if (data.isLast())
         {
             ssn.endData();

Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Method.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Method.java?rev=675165&r1=675164&r2=675165&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Method.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Method.java Wed Jul  9 06:26:54 2008
@@ -42,6 +42,7 @@
     private int id;
     private boolean idSet = false;
     private boolean sync = false;
+    private boolean batch = false;
 
     public final int getId()
     {
@@ -59,11 +60,21 @@
         return sync;
     }
 
-    void setSync(boolean value)
+    final void setSync(boolean value)
     {
         this.sync = value;
     }
 
+    public final boolean isBatch()
+    {
+        return batch;
+    }
+
+    final void setBatch(boolean value)
+    {
+        this.batch = value;
+    }
+
     public abstract boolean hasPayload();
 
     public abstract byte getEncodedTrack();
@@ -84,26 +95,30 @@
 
     public String toString()
     {
-        if (getEncodedTrack() != Frame.L4)
-        {
-            return super.toString();
-        }
-
         StringBuilder str = new StringBuilder();
 
-        if (idSet)
+        if (getEncodedTrack() == Frame.L4 && idSet)
         {
             str.append("id=");
             str.append(id);
         }
 
-        if (sync)
+        if (sync || batch)
         {
             if (str.length() > 0)
             {
                 str.append(" ");
             }
-            str.append(" [sync]");
+            str.append("[");
+            if (sync)
+            {
+                str.append("S");
+            }
+            if (batch)
+            {
+                str.append("B");
+            }
+            str.append("]");
         }
 
         if (str.length() > 0)

Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Sender.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Sender.java?rev=675165&r1=675164&r2=675165&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Sender.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Sender.java Wed Jul  9 06:26:54 2008
@@ -31,6 +31,8 @@
 
     void send(T msg);
 
+    void flush();
+
     void close();
 
 }

Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Session.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Session.java?rev=675165&r1=675164&r2=675165&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Session.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Session.java Wed Jul  9 06:26:54 2008
@@ -130,7 +130,7 @@
         log.debug("ID: [%s] %s", this.channel, id);
         if ((id % 65536) == 0)
         {
-            flushProcessed(true);
+            flushProcessed(TIMELY_REPLY);
         }
     }
 
@@ -166,19 +166,14 @@
         }
     }
 
-    public void flushProcessed()
-    {
-        flushProcessed(false);
-    }
-
-    private void flushProcessed(boolean timely_reply)
+    public void flushProcessed(Option ... options)
     {
         RangeSet copy;
         synchronized (processedLock)
         {
             copy = processed.copy();
         }
-        sessionCompleted(copy, timely_reply ? TIMELY_REPLY : NO_OPTION);
+        sessionCompleted(copy, options);
     }
 
     void knownComplete(RangeSet kc)
@@ -353,9 +348,7 @@
 
             if (needSync && lt(maxComplete, point))
             {
-                ExecutionSync sync = new ExecutionSync();
-                sync.setSync(true);
-                invoke(sync);
+                executionSync(SYNC);
             }
 
             long start = System.currentTimeMillis();

Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/codec/AbstractDecoder.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/codec/AbstractDecoder.java?rev=675165&r1=675164&r2=675165&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/codec/AbstractDecoder.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/codec/AbstractDecoder.java Wed Jul  9 06:26:54 2008
@@ -29,6 +29,7 @@
 import java.util.Map;
 import java.util.UUID;
 
+import org.apache.qpidity.transport.Binary;
 import org.apache.qpidity.transport.RangeSet;
 import org.apache.qpidity.transport.Struct;
 import org.apache.qpidity.transport.Type;
@@ -45,6 +46,14 @@
 abstract class AbstractDecoder implements Decoder
 {
 
+    private final Map<Binary,String> str8cache = new LinkedHashMap<Binary,String>()
+    {
+        @Override protected boolean removeEldestEntry(Map.Entry<Binary,String> me)
+        {
+            return size() > 4*1024;
+        }
+    };
+
     protected abstract byte doGet();
 
     protected abstract void doGet(byte[] bytes);
@@ -59,6 +68,13 @@
         doGet(bytes);
     }
 
+    protected Binary get(int size)
+    {
+        byte[] bytes = new byte[size];
+        get(bytes);
+        return new Binary(bytes);
+    }
+
     protected short uget()
     {
         return (short) (0xFF & get());
@@ -105,11 +121,11 @@
         return readUint64();
     }
 
-    private static final String decode(byte[] bytes, String charset)
+    private static final String decode(byte[] bytes, int offset, int length, String charset)
     {
         try
         {
-            return new String(bytes, charset);
+            return new String(bytes, offset, length, charset);
         }
         catch (UnsupportedEncodingException e)
         {
@@ -117,13 +133,22 @@
         }
     }
 
+    private static final String decode(byte[] bytes, String charset)
+    {
+        return decode(bytes, 0, bytes.length, charset);
+    }
 
     public String readStr8()
     {
         short size = readUint8();
-        byte[] bytes = new byte[size];
-        get(bytes);
-        return decode(bytes, "UTF-8");
+        Binary bin = get(size);
+        String str = str8cache.get(bin);
+        if (str == null)
+        {
+            str = decode(bin.array(), bin.offset(), bin.size(), "UTF-8");
+            str8cache.put(bin, str);
+        }
+        return str;
     }
 
     public String readStr16()
@@ -233,7 +258,19 @@
     public Map<String,Object> readMap()
     {
         long size = readUint32();
+
+        if (size == 0)
+        {
+            return null;
+        }
+
         long count = readUint32();
+
+        if (count == 0)
+        {
+            return Collections.EMPTY_MAP;
+        }
+
         Map<String,Object> result = new LinkedHashMap();
         for (int i = 0; i < count; i++)
         {
@@ -243,13 +280,26 @@
             Object value = read(t);
             result.put(key, value);
         }
+
         return result;
     }
 
     public List<Object> readList()
     {
         long size = readUint32();
+
+        if (size == 0)
+        {
+            return null;
+        }
+
         long count = readUint32();
+
+        if (count == 0)
+        {
+            return Collections.EMPTY_LIST;
+        }
+
         List<Object> result = new ArrayList();
         for (int i = 0; i < count; i++)
         {
@@ -264,15 +314,21 @@
     public List<Object> readArray()
     {
         long size = readUint32();
+
         if (size == 0)
         {
-            return Collections.EMPTY_LIST;
+            return null;
         }
 
         byte code = get();
         Type t = getType(code);
         long count = readUint32();
 
+        if (count == 0)
+        {
+            return Collections.EMPTY_LIST;
+        }
+
         List<Object> result = new ArrayList<Object>();
         for (int i = 0; i < count; i++)
         {

Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/codec/AbstractEncoder.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/codec/AbstractEncoder.java?rev=675165&r1=675164&r2=675165&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/codec/AbstractEncoder.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/codec/AbstractEncoder.java Wed Jul  9 06:26:54 2008
@@ -26,6 +26,7 @@
 
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
@@ -64,10 +65,13 @@
         ENCODINGS.put(byte[].class, Type.VBIN32);
     }
 
-    protected Sizer sizer()
+    private final Map<String,byte[]> str8cache = new LinkedHashMap<String,byte[]>()
     {
-        return new SizeEncoder();
-    }
+        @Override protected boolean removeEldestEntry(Map.Entry<String,byte[]> me)
+        {
+            return size() > 4*1024;
+        }
+    };
 
     protected abstract void doPut(byte b);
 
@@ -88,6 +92,15 @@
         put(ByteBuffer.wrap(bytes));
     }
 
+    protected abstract int beginSize8();
+    protected abstract void endSize8(int pos);
+
+    protected abstract int beginSize16();
+    protected abstract void endSize16(int pos);
+
+    protected abstract int beginSize32();
+    protected abstract void endSize32(int pos);
+
     public void writeUint8(short b)
     {
         assert b < 0x100;
@@ -132,23 +145,6 @@
         writeUint64(l);
     }
 
-    private static final String checkLength(String s, int n)
-    {
-        if (s == null)
-        {
-            return "";
-        }
-
-        if (s.length() > n)
-        {
-            throw new IllegalArgumentException("string too long: " + s);
-        }
-        else
-        {
-            return s;
-        }
-    }
-
     private static final byte[] encode(String s, String charset)
     {
         try
@@ -163,16 +159,31 @@
 
     public void writeStr8(String s)
     {
-        s = checkLength(s, 255);
-        writeUint8((short) s.length());
-        put(ByteBuffer.wrap(encode(s, "UTF-8")));
+        if (s == null)
+        {
+            s = "";
+        }
+
+        byte[] bytes = str8cache.get(s);
+        if (bytes == null)
+        {
+            bytes = encode(s, "UTF-8");
+            str8cache.put(s, bytes);
+        }
+        writeUint8((short) bytes.length);
+        put(bytes);
     }
 
     public void writeStr16(String s)
     {
-        s = checkLength(s, 65535);
-        writeUint16(s.length());
-        put(ByteBuffer.wrap(encode(s, "UTF-8")));
+        if (s == null)
+        {
+            s = "";
+        }
+
+        byte[] bytes = encode(s, "UTF-8");
+        writeUint16(bytes.length);
+        put(bytes);
     }
 
     public void writeVbin8(byte[] bytes)
@@ -245,18 +256,10 @@
         }
 
         int width = s.getSizeWidth();
+        int pos = -1;
         if (width > 0)
         {
-            if (empty)
-            {
-                writeSize(width, 0);
-            }
-            else
-            {
-                Sizer sizer = sizer();
-                s.write(sizer);
-                writeSize(width, sizer.size());
-            }
+            pos = beginSize(width);
         }
 
         if (type > 0)
@@ -265,6 +268,11 @@
         }
 
         s.write(this);
+
+        if (width > 0)
+        {
+            endSize(width, pos);
+        }
     }
 
     public void writeStruct32(Struct s)
@@ -275,12 +283,10 @@
         }
         else
         {
-            Sizer sizer = sizer();
-            sizer.writeUint16(s.getEncodedType());
-            s.write(sizer);
-            writeUint32(sizer.size());
+            int pos = beginSize32();
             writeUint16(s.getEncodedType());
             s.write(this);
+            endSize32(pos);
         }
     }
 
@@ -338,18 +344,13 @@
 
     public void writeMap(Map<String,Object> map)
     {
-        if (map == null)
+        int pos = beginSize32();
+        if (map != null)
         {
-            writeUint32(0);
-            return;
+            writeUint32(map.size());
+            writeMapEntries(map);
         }
-
-        Sizer sizer = sizer();
-        sizer.writeMap(map);
-        // XXX: - 4
-        writeUint32(sizer.size() - 4);
-        writeUint32(map.size());
-        writeMapEntries(map);
+        endSize32(pos);
     }
 
     protected void writeMapEntries(Map<String,Object> map)
@@ -367,12 +368,13 @@
 
     public void writeList(List<Object> list)
     {
-        Sizer sizer = sizer();
-        sizer.writeList(list);
-        // XXX: - 4
-        writeUint32(sizer.size() - 4);
-        writeUint32(list.size());
-        writeListEntries(list);
+        int pos = beginSize32();
+        if (list != null)
+        {
+            writeUint32(list.size());
+            writeListEntries(list);
+        }
+        endSize32(pos);
     }
 
     protected void writeListEntries(List<Object> list)
@@ -387,16 +389,12 @@
 
     public void writeArray(List<Object> array)
     {
-        if (array == null)
+        int pos = beginSize32();
+        if (array != null)
         {
-            array = Collections.EMPTY_LIST;
+            writeArrayEntries(array);
         }
-
-        Sizer sizer = sizer();
-        sizer.writeArray(array);
-        // XXX: -4
-        writeUint32(sizer.size() - 4);
-        writeArrayEntries(array);
+        endSize32(pos);
     }
 
     protected void writeArrayEntries(List<Object> array)
@@ -458,6 +456,39 @@
         }
     }
 
+    private int beginSize(int width)
+    {
+        switch (width)
+        {
+        case 1:
+            return beginSize8();
+        case 2:
+            return beginSize16();
+        case 4:
+            return beginSize32();
+        default:
+            throw new IllegalStateException("illegal width: " + width);
+        }
+    }
+
+    private void endSize(int width, int pos)
+    {
+        switch (width)
+        {
+        case 1:
+            endSize8(pos);
+            break;
+        case 2:
+            endSize16(pos);
+            break;
+        case 4:
+            endSize32(pos);
+            break;
+        default:
+            throw new IllegalStateException("illegal width: " + width);
+        }
+    }
+
     private void writeBytes(Type t, byte[] bytes)
     {
         writeSize(t, bytes.length);

Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/codec/BBDecoder.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/codec/BBDecoder.java?rev=675165&r1=675164&r2=675165&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/codec/BBDecoder.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/codec/BBDecoder.java Wed Jul  9 06:26:54 2008
@@ -23,6 +23,8 @@
 import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
 
+import org.apache.qpidity.transport.Binary;
+
 
 /**
  * BBDecoder
@@ -33,9 +35,9 @@
 public final class BBDecoder extends AbstractDecoder
 {
 
-    private final ByteBuffer in;
+    private ByteBuffer in;
 
-    public BBDecoder(ByteBuffer in)
+    public void init(ByteBuffer in)
     {
         this.in = in;
         this.in.order(ByteOrder.BIG_ENDIAN);
@@ -51,6 +53,21 @@
         in.get(bytes);
     }
 
+    protected Binary get(int size)
+    {
+        if (in.hasArray())
+        {
+            byte[] bytes = in.array();
+            Binary bin = new Binary(bytes, in.arrayOffset() + in.position(), size);
+            in.position(in.position() + size);
+            return bin;
+        }
+        else
+        {
+            return super.get(size);
+        }
+    }
+
     public boolean hasRemaining()
     {
         return in.hasRemaining();

Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/codec/BBEncoder.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/codec/BBEncoder.java?rev=675165&r1=675164&r2=675165&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/codec/BBEncoder.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/codec/BBEncoder.java Wed Jul  9 06:26:54 2008
@@ -20,6 +20,7 @@
  */
 package org.apache.qpidity.transport.codec;
 
+import java.nio.BufferOverflowException;
 import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
 
@@ -33,47 +34,194 @@
 public final class BBEncoder extends AbstractEncoder
 {
 
-    private final ByteBuffer out;
+    private ByteBuffer out;
 
-    public BBEncoder(ByteBuffer out) {
-        this.out = out;
-        this.out.order(ByteOrder.BIG_ENDIAN);
+    public BBEncoder(int capacity) {
+        out = ByteBuffer.allocate(capacity);
+        out.order(ByteOrder.BIG_ENDIAN);
+    }
+
+    public void init()
+    {
+        out.clear();
+    }
+
+    public ByteBuffer done()
+    {
+        out.flip();
+        ByteBuffer encoded = ByteBuffer.allocate(out.remaining());
+        encoded.put(out);
+        encoded.flip();
+        return encoded;
+    }
+
+    private void grow(int size)
+    {
+        ByteBuffer old = out;
+        int capacity = old.capacity();
+        out = ByteBuffer.allocate(Math.max(capacity + size, 2*capacity));
+        out.order(ByteOrder.BIG_ENDIAN);
+        out.put(old);
     }
 
     protected void doPut(byte b)
     {
-        out.put(b);
+        try
+        {
+            out.put(b);
+        }
+        catch (BufferOverflowException e)
+        {
+            grow(1);
+            out.put(b);
+        }
     }
 
     protected void doPut(ByteBuffer src)
     {
-        out.put(src);
+        try
+        {
+            out.put(src);
+        }
+        catch (BufferOverflowException e)
+        {
+            grow(src.remaining());
+            out.put(src);
+        }
+    }
+
+    protected void put(byte[] bytes)
+    {
+        try
+        {
+            out.put(bytes);
+        }
+        catch (BufferOverflowException e)
+        {
+            grow(bytes.length);
+            out.put(bytes);
+        }
     }
 
     public void writeUint8(short b)
     {
         assert b < 0x100;
 
-        out.put((byte) b);
+        try
+        {
+            out.put((byte) b);
+        }
+        catch (BufferOverflowException e)
+        {
+            grow(1);
+            out.put((byte) b);
+        }
     }
 
     public void writeUint16(int s)
     {
         assert s < 0x10000;
 
-        out.putShort((short) s);
+        try
+        {
+            out.putShort((short) s);
+        }
+        catch (BufferOverflowException e)
+        {
+            grow(2);
+            out.putShort((short) s);
+        }
     }
 
     public void writeUint32(long i)
     {
         assert i < 0x100000000L;
 
-        out.putInt((int) i);
+        try
+        {
+            out.putInt((int) i);
+        }
+        catch (BufferOverflowException e)
+        {
+            grow(4);
+            out.putInt((int) i);
+        }
     }
 
     public void writeUint64(long l)
     {
-        out.putLong(l);
+        try
+        {
+            out.putLong(l);
+        }
+        catch (BufferOverflowException e)
+        {
+            grow(8);
+            out.putLong(l);
+        }
+    }
+
+    public int beginSize8()
+    {
+        int pos = out.position();
+        try
+        {
+            out.put((byte) 0);
+        }
+        catch (BufferOverflowException e)
+        {
+            grow(1);
+            out.put((byte) 0);
+        }
+        return pos;
+    }
+
+    public void endSize8(int pos)
+    {
+        int cur = out.position();
+        out.put(pos, (byte) (cur - pos - 1));
+    }
+
+    public int beginSize16()
+    {
+        int pos = out.position();
+        try
+        {
+            out.putShort((short) 0);
+        }
+        catch (BufferOverflowException e)
+        {
+            grow(2);
+            out.putShort((short) 0);
+        }
+        return pos;
+    }
+
+    public void endSize16(int pos)
+    {
+        int cur = out.position();
+        out.putShort(pos, (short) (cur - pos - 2));
+    }
+
+    public int beginSize32()
+    {
+        int pos = out.position();
+        try
+        {
+            out.putInt(0);
+        }
+        catch (BufferOverflowException e)
+        {
+            grow(4);
+            out.putInt(0);
+        }
+        return pos;
+    }
+
+    public void endSize32(int pos)
+    {
+        int cur = out.position();
+        out.putInt(pos, (cur - pos - 4));
     }
 
 }

Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/Assembler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/Assembler.java?rev=675165&r1=675164&r2=675165&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/Assembler.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/Assembler.java Wed Jul  9 06:26:54 2008
@@ -29,7 +29,6 @@
 
 import org.apache.qpidity.transport.codec.BBDecoder;
 import org.apache.qpidity.transport.codec.Decoder;
-import org.apache.qpidity.transport.codec.FragmentDecoder;
 
 import org.apache.qpidity.transport.ConnectionEvent;
 import org.apache.qpidity.transport.Data;
@@ -52,26 +51,32 @@
 {
 
     private final Receiver<ConnectionEvent> receiver;
-    private final Map<Integer,List<ByteBuffer>> segments;
+    private final Map<Integer,List<Frame>> segments;
+    private final ThreadLocal<BBDecoder> decoder = new ThreadLocal<BBDecoder>()
+    {
+        public BBDecoder initialValue()
+        {
+            return new BBDecoder();
+        }
+    };
 
     public Assembler(Receiver<ConnectionEvent> receiver)
     {
         this.receiver = receiver;
-        segments = new HashMap<Integer,List<ByteBuffer>>();
+        segments = new HashMap<Integer,List<Frame>>();
     }
 
     private int segmentKey(Frame frame)
     {
-        // XXX: can this overflow?
         return (frame.getTrack() + 1) * frame.getChannel();
     }
 
-    private List<ByteBuffer> getSegment(Frame frame)
+    private List<Frame> getSegment(Frame frame)
     {
         return segments.get(segmentKey(frame));
     }
 
-    private void setSegment(Frame frame, List<ByteBuffer> segment)
+    private void setSegment(Frame frame, List<Frame> segment)
     {
         int key = segmentKey(frame);
         if (segments.containsKey(key))
@@ -122,7 +127,7 @@
         switch (frame.getType())
         {
         case BODY:
-            emit(frame, new Data(frame, frame.isFirstFrame(),
+            emit(frame, new Data(frame.getBody(), frame.isFirstFrame(),
                                  frame.isLastFrame()));
             break;
         default:
@@ -138,42 +143,54 @@
 
     private void assemble(Frame frame)
     {
-        List<ByteBuffer> segment;
-        if (frame.isFirstFrame())
+        ByteBuffer segment;
+        if (frame.isFirstFrame() && frame.isLastFrame())
         {
-            segment = new ArrayList<ByteBuffer>();
-            setSegment(frame, segment);
+            segment = frame.getBody();
+            emit(frame, decode(frame, segment));
         }
         else
         {
-            segment = getSegment(frame);
-        }
+            List<Frame> frames;
+            if (frame.isFirstFrame())
+            {
+                frames = new ArrayList<Frame>();
+                setSegment(frame, frames);
+            }
+            else
+            {
+                frames = getSegment(frame);
+            }
 
-        for (ByteBuffer buf : frame)
-        {
-            segment.add(buf);
-        }
+            frames.add(frame);
 
-        if (frame.isLastFrame())
-        {
-            clearSegment(frame);
-            emit(frame, decode(frame, frame.getType(), segment));
+            if (frame.isLastFrame())
+            {
+                clearSegment(frame);
+
+                int size = 0;
+                for (Frame f : frames)
+                {
+                    size += f.getSize();
+                }
+                segment = ByteBuffer.allocate(size);
+                for (Frame f : frames)
+                {
+                    segment.put(f.getBody());
+                }
+                segment.flip();
+                emit(frame, decode(frame, segment));
+            }
         }
+
     }
 
-    private ProtocolEvent decode(Frame frame, SegmentType type, List<ByteBuffer> segment)
+    private ProtocolEvent decode(Frame frame, ByteBuffer segment)
     {
-        Decoder dec;
-        if (segment.size() == 1)
-        {
-            dec = new BBDecoder(segment.get(0));
-        }
-        else
-        {
-            dec = new FragmentDecoder(segment.iterator());
-        }
+        BBDecoder dec = decoder.get();
+        dec.init(segment);
 
-        switch (type)
+        switch (frame.getType())
         {
         case CONTROL:
             int controlType = dec.readUint16();
@@ -193,9 +210,9 @@
             {
                 structs.add(dec.readStruct32());
             }
-            return new Header(structs,frame.isLastFrame() && frame.isLastSegment());
+            return new Header(structs, frame.isLastFrame() && frame.isLastSegment());
         default:
-            throw new IllegalStateException("unknown frame type: " + type);
+            throw new IllegalStateException("unknown frame type: " + frame.getType());
         }
     }
 

Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/Disassembler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/Disassembler.java?rev=675165&r1=675164&r2=675165&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/Disassembler.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/Disassembler.java Wed Jul  9 06:26:54 2008
@@ -21,7 +21,6 @@
 package org.apache.qpidity.transport.network;
 
 import org.apache.qpidity.transport.codec.BBEncoder;
-import org.apache.qpidity.transport.codec.SizeEncoder;
 
 import org.apache.qpidity.transport.ConnectionEvent;
 import org.apache.qpidity.transport.Data;
@@ -54,6 +53,13 @@
 
     private final Sender<NetworkEvent> sender;
     private final int maxPayload;
+    private final ThreadLocal<BBEncoder> encoder = new ThreadLocal()
+    {
+        public BBEncoder initialValue()
+        {
+            return new BBEncoder(4*1024);
+        }
+    };
 
     public Disassembler(Sender<NetworkEvent> sender, int maxFrame)
     {
@@ -72,6 +78,11 @@
         event.getProtocolEvent().delegate(event, this);
     }
 
+    public void flush()
+    {
+        sender.flush();
+    }
+
     public void close()
     {
         sender.close();
@@ -92,8 +103,7 @@
                 first = false;
             }
             nflags |= LAST_FRAME;
-            Frame frame = new Frame(nflags, type, track, event.getChannel());
-            // frame.addFragment(buf);
+            Frame frame = new Frame(nflags, type, track, event.getChannel(), buf.slice());
             sender.send(frame);
         }
         else
@@ -115,8 +125,7 @@
                     newflags |= LAST_FRAME;
                 }
 
-                Frame frame = new Frame(newflags, type, track, event.getChannel());
-                frame.addFragment(slice);
+                Frame frame = new Frame(newflags, type, track, event.getChannel(), slice);
                 sender.send(frame);
             }
         }
@@ -137,18 +146,18 @@
         method(event, method, SegmentType.COMMAND);
     }
 
-    private void method(ConnectionEvent event, Method method, SegmentType type)
+    private ByteBuffer copy(ByteBuffer src)
     {
-        SizeEncoder sizer = new SizeEncoder();
-        sizer.writeUint16(method.getEncodedType());
-        if (type == SegmentType.COMMAND)
-        {
-            sizer.writeUint16(0);
-        }
-        method.write(sizer);
+        ByteBuffer buf = ByteBuffer.allocate(src.remaining());
+        buf.put(src);
+        buf.flip();
+        return buf;
+    }
 
-        ByteBuffer buf = ByteBuffer.allocate(sizer.size());
-        BBEncoder enc = new BBEncoder(buf);
+    private void method(ConnectionEvent event, Method method, SegmentType type)
+    {
+        BBEncoder enc = encoder.get();
+        enc.init();
         enc.writeUint16(method.getEncodedType());
         if (type == SegmentType.COMMAND)
         {
@@ -162,7 +171,7 @@
             }
         }
         method.write(enc);
-        buf.flip();
+        ByteBuffer buf = enc.done();
 
         byte flags = FIRST_SEG;
 
@@ -176,42 +185,29 @@
 
     public void header(ConnectionEvent event, Header header)
     {
-         ByteBuffer buf;
-        if( header.getBuf() == null)
+        ByteBuffer buf;
+        if (header.getBuf() == null)
         {
-            SizeEncoder sizer = new SizeEncoder();
-            for (Struct st : header.getStructs())
-            {
-                sizer.writeStruct32(st);
-            }
-
-            buf = ByteBuffer.allocate(sizer.size());
-            BBEncoder enc = new BBEncoder(buf);
+            BBEncoder enc = encoder.get();
+            enc.init();
             for (Struct st : header.getStructs())
             {
                 enc.writeStruct32(st);
             }
+            buf = enc.done();
             header.setBuf(buf);
         }
         else
         {
             buf = header.getBuf();
+            buf.flip();
         }
-          buf.flip();
         fragment((byte) 0x0, SegmentType.HEADER, event, buf, true, true);
     }
 
     public void data(ConnectionEvent event, Data data)
     {
-        boolean first = data.isFirst();
-        for (Iterator<ByteBuffer> it = data.getFragments().iterator();
-             it.hasNext(); )
-        {
-            ByteBuffer buf = it.next();
-            boolean last = data.isLast() && !it.hasNext();
-            fragment(LAST_SEG, SegmentType.BODY, event, buf, first, last);
-            first = false;
-        }
+        fragment(LAST_SEG, SegmentType.BODY, event, data.getData(), data.isFirst(), data.isLast());
     }
 
     public void error(ConnectionEvent event, ProtocolError error)

Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/Frame.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/Frame.java?rev=675165&r1=675164&r2=675165&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/Frame.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/Frame.java Wed Jul  9 06:26:54 2008
@@ -38,7 +38,7 @@
  * @author Rafael H. Schloming
  */
 
-public final class Frame implements NetworkEvent, Iterable<ByteBuffer>
+public final class Frame implements NetworkEvent
 {
     public static final int HEADER_SIZE = 12;
 
@@ -61,23 +61,21 @@
     final private SegmentType type;
     final private byte track;
     final private int channel;
-    final private List<ByteBuffer> fragments;
-    private int size;
+    final private ByteBuffer body;
 
-    public Frame(byte flags, SegmentType type, byte track, int channel)
+    public Frame(byte flags, SegmentType type, byte track, int channel,
+                 ByteBuffer body)
     {
         this.flags = flags;
         this.type = type;
         this.track = track;
         this.channel = channel;
-        this.size = 0;
-        this.fragments = new ArrayList<ByteBuffer>();
+        this.body = body;
     }
 
-    public void addFragment(ByteBuffer fragment)
+    public ByteBuffer getBody()
     {
-        fragments.add(fragment);
-        size += fragment.remaining();
+        return body.slice();
     }
 
     public byte getFlags()
@@ -92,7 +90,7 @@
 
     public int getSize()
     {
-        return size;
+        return body.remaining();
     }
 
     public SegmentType getType()
@@ -130,16 +128,6 @@
         return flag(LAST_FRAME);
     }
 
-    public Iterator<ByteBuffer> getFragments()
-    {
-        return new SliceIterator(fragments.iterator());
-    }
-
-    public Iterator<ByteBuffer> iterator()
-    {
-        return getFragments();
-    }
-
     public void delegate(NetworkDelegate delegate)
     {
         delegate.frame(this);
@@ -148,26 +136,14 @@
     public String toString()
     {
         StringBuilder str = new StringBuilder();
+
         str.append(String.format
                    ("[%05d %05d %1d %s %d%d%d%d] ", getChannel(), getSize(),
                     getTrack(), getType(),
                     isFirstSegment() ? 1 : 0, isLastSegment() ? 1 : 0,
                     isFirstFrame() ? 1 : 0, isLastFrame() ? 1 : 0));
 
-        boolean first = true;
-        for (ByteBuffer buf : this)
-        {
-            if (first)
-            {
-                first = false;
-            }
-            else
-            {
-                str.append(" | ");
-            }
-
-            str.append(str(buf));
-        }
+        str.append(str(body));
 
         return str.toString();
     }

Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/InputHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/InputHandler.java?rev=675165&r1=675164&r2=675165&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/InputHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/InputHandler.java Wed Jul  9 06:26:54 2008
@@ -79,7 +79,7 @@
     private byte track;
     private int channel;
     private int size;
-    private Frame frame;
+    private ByteBuffer body;
 
     public InputHandler(Receiver<NetworkEvent> receiver, State state)
     {
@@ -99,9 +99,9 @@
 
     private void frame()
     {
+        Frame frame = new Frame(flags, type, track, channel, body);
         assert size == frame.getSize();
         receiver.received(frame);
-        frame = null;
     }
 
     private void error(String fmt, Object ... args)
@@ -191,30 +191,28 @@
                 return ERROR;
             }
 
-            frame = new Frame(flags, type, track, channel);
             if (size > buf.remaining()) {
-                frame.addFragment(buf.slice());
-                buf.position(buf.limit());
+                body = ByteBuffer.allocate(size);
+                body.put(buf);
                 return FRAME_FRAGMENT;
             } else {
-                ByteBuffer payload = buf.slice();
-                payload.limit(size);
+                body = buf.slice();
+                body.limit(size);
                 buf.position(buf.position() + size);
-                frame.addFragment(payload);
                 frame();
                 return FRAME_HDR;
             }
         case FRAME_FRAGMENT:
-            int delta = size - frame.getSize();
+            int delta = body.remaining();
             if (delta > buf.remaining()) {
-                frame.addFragment(buf.slice());
-                buf.position(buf.limit());
+                body.put(buf);
                 return FRAME_FRAGMENT;
             } else {
                 ByteBuffer fragment = buf.slice();
                 fragment.limit(delta);
                 buf.position(buf.position() + delta);
-                frame.addFragment(fragment);
+                body.put(fragment);
+                body.flip();
                 frame();
                 return FRAME_HDR;
             }