You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2008/08/08 20:31:19 UTC

svn commit: r684036 [2/2] - in /incubator/qpid/trunk/qpid/java: client/src/main/java/org/apache/qpid/client/ client/src/main/java/org/apache/qpid/client/handler/ client/src/main/java/org/apache/qpid/client/message/ client/src/main/java/org/apache/qpid/...

Propchange: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/CloseConsumerMessage.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java?rev=684036&r1=684035&r2=684036&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java Fri Aug  8 11:31:18 2008
@@ -66,17 +66,6 @@
     }
 
 
-    JMSTextMessage(AMQMessageDelegateFactory delegateFactory, ByteBuffer data) throws JMSException
-    {
-        this(delegateFactory, data, null);
-    }
-
-    JMSTextMessage(AMQMessageDelegateFactory delegateFactory, String text) throws JMSException
-    {
-        super(delegateFactory, (ByteBuffer) null);
-        setText(text);
-    }
-
     public void clearBodyImpl() throws JMSException
     {
         if (_data != null)

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactory.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactory.java?rev=684036&r1=684035&r2=684036&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactory.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactory.java Fri Aug  8 11:31:18 2008
@@ -40,7 +40,7 @@
 
      AbstractJMSMessage createMessage(long deliveryTag, boolean redelivered,
                                       Struct[] contentHeader,
-                                      List bodies)
+                                      java.nio.ByteBuffer body)
         throws JMSException, AMQException;
 
     AbstractJMSMessage createMessage(AMQMessageDelegateFactory delegateFactory) throws JMSException;

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java?rev=684036&r1=684035&r2=684036&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java Fri Aug  8 11:31:18 2008
@@ -23,6 +23,7 @@
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.nio.ByteBuffer;
 
 import javax.jms.JMSException;
 
@@ -32,6 +33,8 @@
 import org.apache.qpid.framing.ContentHeaderBody;
 import org.apache.qpid.transport.Struct;
 import org.apache.qpid.transport.MessageProperties;
+import org.apache.qpid.transport.MessageTransfer;
+import org.apache.qpid.transport.DeliveryProperties;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -118,24 +121,30 @@
         }
     }
 
-    public AbstractJMSMessage createMessage(long deliveryTag, boolean redelivered,
-                                            Struct[] contentHeader, List bodies) throws AMQException, JMSException
+    public AbstractJMSMessage createMessage(MessageTransfer transfer) throws AMQException, JMSException
     {
-        MessageProperties mprop = (MessageProperties) contentHeader[0];
+
+        MessageProperties mprop = transfer.getHeader().get(MessageProperties.class);
         String messageType = mprop.getContentType();
         if (messageType == null)
         {
             _logger.debug("no message type specified, building a byte message");
             messageType = JMSBytesMessage.MIME_TYPE;
         }
-        MessageFactory mf = _mimeShortStringToFactoryMap.get(new AMQShortString(messageType));
+        MessageFactory mf = _mimeStringToFactoryMap.get(messageType);
         if (mf == null)
         {
             throw new AMQException(null, "Unsupport MIME type of " + messageType, null);
         }
         else
         {
-            return mf.createMessage(deliveryTag, redelivered, contentHeader, bodies);
+            boolean redelivered = false;
+            DeliveryProperties deliverProps;
+            if((deliverProps = transfer.getHeader().get(DeliveryProperties.class)) != null)
+            {
+                redelivered = deliverProps.getRedelivered();
+            }
+            return mf.createMessage(transfer.getId(), redelivered, transfer.getHeader().getStructs(), transfer.getBody());
         }
     }
 

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/ReturnMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/ReturnMessage.java?rev=684036&r1=684035&r2=684036&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/ReturnMessage.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/ReturnMessage.java Fri Aug  8 11:31:18 2008
@@ -7,9 +7,9 @@
     final private AMQShortString  _replyText;
     final private int _replyCode;
 
-    public ReturnMessage(int channelId,AMQShortString exchange,AMQShortString routingKey,AMQShortString replyText,int replyCode)
+    public ReturnMessage(AMQShortString exchange, AMQShortString routingKey, AMQShortString replyText, int replyCode)
     {
-        super(channelId,-1,null,exchange,routingKey,false);
+        super(-1,0,exchange,routingKey,false);
         _replyText = replyText;
         _replyCode = replyCode;
     }

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java?rev=684036&r1=684035&r2=684036&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java Fri Aug  8 11:31:18 2008
@@ -20,23 +20,7 @@
  */
 package org.apache.qpid.client.message;
 
-import java.util.List;
-
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.mina.common.ByteBuffer;
-import org.apache.qpid.AMQChannelException;
-import org.apache.qpid.AMQConnectionException;
-import org.apache.qpid.AMQException;
 import org.apache.qpid.client.BasicMessageConsumer;
-import org.apache.qpid.framing.AMQFrame;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.BasicDeliverBody;
-import org.apache.qpid.framing.BasicReturnBody;
-import org.apache.qpid.framing.ContentBody;
-import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.framing.MethodDispatcher;
-import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
 
 
 /**
@@ -46,217 +30,24 @@
  * Note that the actual work of creating a JMS message for the client code's use is done outside of the MINA dispatcher
  * thread in order to minimise the amount of work done in the MINA dispatcher thread.
  */
-public abstract class UnprocessedMessage<H,B>
+public abstract class UnprocessedMessage
 {
-    private final int _channelId;
-    private final long _deliveryId;
-    private final AMQShortString _consumerTag;
-    protected AMQShortString _exchange;
-    protected AMQShortString _routingKey;
-    protected boolean _redelivered;
+    private final int _consumerTag;
+
 
-    public UnprocessedMessage(int channelId,long deliveryId,AMQShortString consumerTag,AMQShortString exchange,AMQShortString routingKey,boolean redelivered)
+    public UnprocessedMessage(int consumerTag)
     {
-        _channelId = channelId;
-        _deliveryId = deliveryId;
         _consumerTag = consumerTag;
-        _exchange = exchange;
-        _routingKey = routingKey;
-        _redelivered = redelivered;
     }
 
-    public abstract void receiveBody(B nativeMessageBody);
 
-    public abstract void setContentHeader(H nativeMessageHeader);
+    abstract public long getDeliveryTag();
 
-    public int getChannelId()
-    {
-        return _channelId;
-    }
 
-    public long getDeliveryTag()
-    {
-        return _deliveryId;
-    }
-
-    public AMQShortString getConsumerTag()
+    public int getConsumerTag()
     {
         return _consumerTag;
     }
 
-    public AMQShortString getExchange()
-    {
-        return _exchange;
-    }
-
-    public AMQShortString getRoutingKey()
-    {
-        return _routingKey;
-    }
 
-    public boolean isRedelivered()
-    {
-        return _redelivered;
-    }
-    public abstract List<B> getBodies();
-  
-    public abstract H getContentHeader();
- 
-    // specific to 0_10
-    public String getReplyToURL()
-    {
-        return "";
-    }    
-    
-    public static final class CloseConsumerMessage extends UnprocessedMessage
-    {
-        AMQShortString _consumerTag;
-
-        public CloseConsumerMessage(int channelId, long deliveryId, AMQShortString consumerTag,
-                AMQShortString exchange, AMQShortString routingKey, boolean redelivered)
-        {
-            super(channelId, deliveryId, consumerTag, exchange, routingKey, redelivered);
-            _consumerTag = consumerTag;
-        }
-
-        public CloseConsumerMessage(BasicMessageConsumer consumer)
-        {
-            this(0, 0, consumer.getConsumerTag(), null, null, false);            
-        }
-
-        public BasicDeliverBody getDeliverBody()
-        {
-            return new BasicDeliverBody()
-            {
-
-                public AMQShortString getConsumerTag()
-                {
-                    return _consumerTag;
-                }
-
-                public long getDeliveryTag()
-                {
-                    return 0;
-                }
-
-                public AMQShortString getExchange()
-                {
-                    return null;
-                }
-
-                public boolean getRedelivered()
-                {
-                    return false;
-                }
-
-                public AMQShortString getRoutingKey()
-                {
-                    return null;
-                }
-
-                public boolean execute(MethodDispatcher methodDispatcher, int channelId) throws AMQException
-                {
-                    return false;
-                }
-
-                public AMQFrame generateFrame(int channelId)
-                {
-                    return null;
-                }
-
-                public AMQChannelException getChannelException(AMQConstant code, String message)
-                {
-                    return null;
-                }
-
-                public AMQChannelException getChannelException(AMQConstant code, String message, Throwable cause)
-                {
-                    return null;
-                }
-
-                public AMQChannelException getChannelNotFoundException(int channelId)
-                {
-                    return null;
-                }
-
-                public int getClazz()
-                {
-                    return 0;
-                }
-
-                public AMQConnectionException getConnectionException(AMQConstant code, String message)
-                {
-                    return null;
-                }
-
-                public AMQConnectionException getConnectionException(AMQConstant code, String message, Throwable cause)
-                {
-                    return null;
-                }
-
-                public byte getMajor()
-                {
-                    return 0;
-                }
-
-                public int getMethod()
-                {
-                    return 0;
-                }
-
-                public byte getMinor()
-                {
-                    return 0;
-                }
-
-                public int getSize()
-                {
-                    return 0;
-                }
-
-                public void writeMethodPayload(ByteBuffer buffer)
-                {
-                }
-
-                public void writePayload(ByteBuffer buffer)
-                {
-                }
-
-                public byte getFrameType()
-                {
-                    return 0;
-                }
-
-                public void handle(int channelId, AMQVersionAwareProtocolSession amqMinaProtocolSession)
-                        throws AMQException
-                {
-
-                }
-            };
-        }
-
-        @Override
-        public List getBodies()
-        {
-            return null;
-        }
-
-        @Override
-        public Object getContentHeader()
-        {
-            return null;
-        }
-
-        @Override
-        public void receiveBody(Object nativeMessageBody)
-        {
-            
-        }
-
-        @Override
-        public void setContentHeader(Object nativeMessageHeader)
-        {
-            
-        }
-    }
 }
\ No newline at end of file

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_10.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_10.java?rev=684036&r1=684035&r2=684036&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_10.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_10.java Fri Aug  8 11:31:18 2008
@@ -20,13 +20,7 @@
  */
 package org.apache.qpid.client.message;
 
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.transport.DeliveryProperties;
-import org.apache.qpid.transport.Struct;
+import org.apache.qpid.transport.MessageTransfer;
 
 /**
  * This class contains everything needed to process a JMS message. It assembles the deliver body, the content header and
@@ -35,58 +29,25 @@
  * Note that the actual work of creating a JMS message for the client code's use is done outside of the MINA dispatcher
  * thread in order to minimise the amount of work done in the MINA dispatcher thread.
  */
-public class UnprocessedMessage_0_10 extends UnprocessedMessage<Struct[],ByteBuffer>
+public class UnprocessedMessage_0_10 extends UnprocessedMessage
 {
-    private Struct[] _headers;
-    private String _replyToURL;
+    private MessageTransfer _transfer;
 
-    /** List of ContentBody instances. Due to fragmentation you don't know how big this will be in general */
-    private List<ByteBuffer> _bodies = new ArrayList<ByteBuffer>();
-
-    public UnprocessedMessage_0_10(int channelId,long deliveryId,AMQShortString consumerTag,AMQShortString exchange,AMQShortString routingKey,boolean redelivered)
-    {
-        super(channelId,deliveryId,consumerTag,exchange,routingKey,redelivered);
-    }
-
-    public void receiveBody(ByteBuffer body)
-    {
-
-        _bodies.add(body);
-    }
-
-    public void setContentHeader(Struct[] headers)
+    public UnprocessedMessage_0_10(int consumerTag, MessageTransfer xfr)
     {
-        this._headers = headers;
-        for(Struct s: headers)
-        {
-            if (s instanceof DeliveryProperties)
-            {
-                DeliveryProperties props = (DeliveryProperties)s;
-                _exchange = new AMQShortString(props.getExchange());
-                _routingKey = new AMQShortString(props.getRoutingKey());
-                _redelivered = props.getRedelivered();
-            }
-        }
-    }
-
-    public Struct[] getContentHeader()
-    {
-        return _headers;
-    }
-
-    public List<ByteBuffer> getBodies()
-    {
-        return _bodies;
+        super(consumerTag);
+        _transfer = xfr;
     }
 
     // additional 0_10 method
-    public String getReplyToURL()
+
+    public long getDeliveryTag()
     {
-        return _replyToURL;
+        return _transfer.getId();
     }
 
-    public void setReplyToURL(String url)
+    public MessageTransfer getMessageTransfer()
     {
-        _replyToURL = url;
+        return _transfer;
     }
 }

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_8.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_8.java?rev=684036&r1=684035&r2=684036&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_8.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_8.java Fri Aug  8 11:31:18 2008
@@ -26,7 +26,6 @@
 
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.BasicDeliverBody;
-import org.apache.qpid.framing.BasicReturnBody;
 import org.apache.qpid.framing.ContentBody;
 import org.apache.qpid.framing.ContentHeaderBody;
 
@@ -37,32 +36,54 @@
  * Note that the actual work of creating a JMS message for the client code's use is done outside of the MINA dispatcher
  * thread in order to minimise the amount of work done in the MINA dispatcher thread.
  */
-public class UnprocessedMessage_0_8 extends UnprocessedMessage<ContentHeaderBody,ContentBody>
+public class UnprocessedMessage_0_8 extends UnprocessedMessage
 {
     private long _bytesReceived = 0;
 
+
+    private AMQShortString _exchange;
+    private AMQShortString _routingKey;
+    private final long _deliveryId;
+    protected boolean _redelivered;
+
     private BasicDeliverBody _deliverBody;
     private ContentHeaderBody _contentHeader;
 
     /** List of ContentBody instances. Due to fragmentation you don't know how big this will be in general */
     private List<ContentBody> _bodies;
 
-    public UnprocessedMessage_0_8(int channelId,long deliveryId,AMQShortString consumerTag,AMQShortString exchange,AMQShortString routingKey,boolean redelivered)
+    public UnprocessedMessage_0_8(long deliveryId, int consumerTag, AMQShortString exchange, AMQShortString routingKey, boolean redelivered)
+    {
+        super(consumerTag);
+        _exchange = exchange;
+        _routingKey = routingKey;
+
+        _redelivered = redelivered;
+        _deliveryId = deliveryId;
+    }
+
+
+    public AMQShortString getExchange()
+    {
+        return _exchange;
+    }
+
+    public AMQShortString getRoutingKey()
     {
-        super(channelId,deliveryId,consumerTag,exchange,routingKey,redelivered);
+        return _routingKey;
     }
 
-    public UnprocessedMessage_0_8(int channelId, BasicReturnBody body)
+    public long getDeliveryTag()
     {
-        //FIXME: TGM, SRSLY 4RL
-        super(channelId, 0, null, body.getExchange(), body.getRoutingKey(), false);
+        return _deliveryId;
     }
 
-    public UnprocessedMessage_0_8(int channelId, BasicDeliverBody body)
+    public boolean isRedelivered()
     {
-        super(channelId, body.getDeliveryTag(), body.getConsumerTag(), body.getExchange(), body.getRoutingKey(), false);
+        return _redelivered;
     }
 
+
     public void receiveBody(ContentBody body)
     {
 
@@ -124,7 +145,7 @@
     public String toString()
     {
         StringBuilder buf = new StringBuilder();
-        buf.append("Channel Id : " + this.getChannelId());
+
         if (_contentHeader != null)
         {
           buf.append("ContentHeader " + _contentHeader);

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java?rev=684036&r1=684035&r2=684036&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java Fri Aug  8 11:31:18 2008
@@ -224,9 +224,8 @@
      *
      * @throws AMQException if this was not expected
      */
-    public void unprocessedMessageReceived(UnprocessedMessage message) throws AMQException
-    {
-        final int channelId = message.getChannelId();
+    public void unprocessedMessageReceived(final int channelId, UnprocessedMessage message) throws AMQException
+    {        
         if ((channelId & FAST_CHANNEL_ACCESS_MASK) == 0)
         {
             _channelId2UnprocessedMsgArray[channelId] = message;
@@ -239,8 +238,8 @@
 
     public void contentHeaderReceived(int channelId, ContentHeaderBody contentHeader) throws AMQException
     {
-        final UnprocessedMessage msg = (channelId & FAST_CHANNEL_ACCESS_MASK) == 0 ? _channelId2UnprocessedMsgArray[channelId]
-                                       : _channelId2UnprocessedMsgMap.get(channelId);
+        final UnprocessedMessage_0_8 msg = (UnprocessedMessage_0_8) ((channelId & FAST_CHANNEL_ACCESS_MASK) == 0 ? _channelId2UnprocessedMsgArray[channelId]
+                                               : _channelId2UnprocessedMsgMap.get(channelId));
 
         if (msg == null)
         {
@@ -290,15 +289,7 @@
             throw new AMQException(null, "Error: received content body without having received a ContentHeader frame first", null);
         }
 
-        /*try
-        {*/
         msg.receiveBody(contentBody);
-        /*}
-        catch (UnexpectedBodyReceivedException e)
-        {
-            _channelId2UnprocessedMsgMap.remove(channelId);
-            throw e;
-        }*/
 
         if (msg.isAllBodyDataReceived())
         {
@@ -478,7 +469,7 @@
     {
         final AMQSession session = getSession(channelId);
 
-        session.confirmConsumerCancelled(consumerTag);
+        session.confirmConsumerCancelled(consumerTag.toIntValue());
     }
 
     public void setProtocolVersion(final ProtocolVersion pv)

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java?rev=684036&r1=684035&r2=684036&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java Fri Aug  8 11:31:18 2008
@@ -21,8 +21,10 @@
 package org.apache.qpid.client.util;
 
 import java.util.Iterator;
+import java.util.Queue;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ConcurrentLinkedQueue;
 
 /**
  * A blocking queue that emits events above a user specified threshold allowing the caller to take action (e.g. flow
@@ -35,7 +37,7 @@
 public class FlowControllingBlockingQueue
 {
     /** This queue is bounded and is used to store messages before being dispatched to the consumer */
-    private final BlockingQueue _queue = new LinkedBlockingQueue();
+    private final Queue _queue = new ConcurrentLinkedQueue();
 
     private final int _flowControlHighThreshold;
     private final int _flowControlLowThreshold;
@@ -71,7 +73,17 @@
 
     public Object take() throws InterruptedException
     {
-        Object o = _queue.take();
+        Object o = _queue.poll();
+        if(o == null)
+        {
+            synchronized(this)
+            {
+                while((o = _queue.poll())==null)
+                {
+                    wait();
+                }
+            }
+        }
         if (_listener != null)
         {
             synchronized (_listener)
@@ -88,7 +100,12 @@
 
     public void add(Object o)
     {
-        _queue.add(o);
+        synchronized(this)
+        {
+            _queue.add(o);
+
+            notifyAll();
+        }
         if (_listener != null)
         {
             synchronized (_listener)

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/MessagePartListener.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/MessagePartListener.java?rev=684036&r1=684035&r2=684036&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/MessagePartListener.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/MessagePartListener.java Fri Aug  8 11:31:18 2008
@@ -20,6 +20,7 @@
 import java.nio.ByteBuffer;
 
 import org.apache.qpid.transport.Header;
+import org.apache.qpid.transport.MessageTransfer;
 
 /**
  * Assembles message parts.
@@ -33,31 +34,13 @@
  * are transferred.
  */
 public interface MessagePartListener
-{    
-    /**
-     * Indicates the Message transfer has started.
-     * 
-     * @param transferId The message transfer ID. 
-     */
-    public void messageTransfer(int transferId);
-    
-    /**
-     * Add the following a header to the message being received.
-     *
-     * @param header Either <code>DeliveryProperties</code> or <code>ApplicationProperties</code>
-     */
-    public void messageHeader(Header header);
+{
 
     /**
-     * Add the following byte array to the content of the message being received
+     * Inform the listener of the message transfer
      *
-     * @param src Data to be added or streamed.
-     */
-    public void data(ByteBuffer src);
-
-    /**
-     * Indicates that the message has been fully received. 
+     * @param xfr the message transfer object
      */
-    public void messageReceived();
+    public void messageTransfer(MessageTransfer xfr);
 
 }

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/impl/ClientSessionDelegate.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/impl/ClientSessionDelegate.java?rev=684036&r1=684035&r2=684036&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/impl/ClientSessionDelegate.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/impl/ClientSessionDelegate.java Fri Aug  8 11:31:18 2008
@@ -26,15 +26,7 @@
     {
         MessagePartListener listener = ((ClientSession)session).getMessageListeners()
             .get(xfr.getDestination());
-        listener.messageTransfer(xfr.getId());
-        listener.messageHeader(xfr.getHeader());
-        ByteBuffer body = xfr.getBody();
-        if (body == null)
-        {
-            body = ByteBuffer.allocate(0);
-        }
-        listener.data(body);
-        listener.messageReceived();
+        listener.messageTransfer(xfr);
     }
 
     @Override public void messageReject(Session session, MessageReject struct)

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/util/ByteBufferMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/util/ByteBufferMessage.java?rev=684036&r1=684035&r2=684036&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/util/ByteBufferMessage.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/util/ByteBufferMessage.java Fri Aug  8 11:31:18 2008
@@ -2,8 +2,7 @@
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.util.LinkedList;
-import java.util.Queue;
+import java.util.*;
 
 import org.apache.qpid.transport.DeliveryProperties;
 import org.apache.qpid.transport.MessageProperties;
@@ -22,7 +21,7 @@
  */
 public class ByteBufferMessage implements Message
 {
-    private Queue<ByteBuffer> _data = new LinkedList<ByteBuffer>();
+    private List<ByteBuffer> _data;// = new ArrayList<ByteBuffer>();
     private ByteBuffer _readBuffer;
     private int _dataSize;
     private DeliveryProperties _currentDeliveryProps;
@@ -76,7 +75,18 @@
      */
     public void appendData(ByteBuffer src) throws IOException
     {
-        _data.offer(src);
+        if(_data == null)
+        {
+            _data = Collections.singletonList(src);
+        }
+        else
+        {
+            if(_data.size() == 1)
+            {
+                _data = new ArrayList<ByteBuffer>(_data);
+            }
+            _data.add(src);
+        }
         _dataSize += src.remaining();
     }
 
@@ -100,12 +110,12 @@
         _currentMessageProps = props;
     }
 
-    public void readData(byte[] target) throws IOException
+    public void readData(byte[] target)
     {
         getReadBuffer().get(target);
     }
 
-    public ByteBuffer readData() throws IOException
+    public ByteBuffer readData()
     {
         return getReadBuffer();
     }
@@ -115,7 +125,7 @@
         //optimize for the simple cases
         if(_data.size() == 1)
         {
-            _readBuffer = _data.element().duplicate();
+            _readBuffer = _data.get(0).duplicate();
         }
         else
         {
@@ -128,7 +138,7 @@
         }
     }
 
-    private ByteBuffer getReadBuffer() throws IOException
+    private ByteBuffer getReadBuffer()
     {
         if (_readBuffer != null )
         {
@@ -143,7 +153,7 @@
             }
             else
             {
-                throw new IOException("No Data to read");
+                return ByteBuffer.allocate(0);
             }
         }
     }
@@ -151,16 +161,9 @@
     //hack for testing
     @Override public String toString()
     {
-        try
-        {
-            ByteBuffer temp = getReadBuffer();
-            byte[] b = new byte[temp.remaining()];
-            temp.get(b);
-            return new String(b);
-        }
-        catch(IOException e)
-        {
-            return "No data";
-        }
+        ByteBuffer temp = getReadBuffer();
+        byte[] b = new byte[temp.remaining()];
+        temp.get(b);
+        return new String(b);
     }
 }

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/util/MessagePartListenerAdapter.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/util/MessagePartListenerAdapter.java?rev=684036&r1=684035&r2=684036&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/util/MessagePartListenerAdapter.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/util/MessagePartListenerAdapter.java Fri Aug  8 11:31:18 2008
@@ -3,9 +3,7 @@
 import java.io.IOException;
 import java.nio.ByteBuffer;
 
-import org.apache.qpid.transport.DeliveryProperties;
-import org.apache.qpid.transport.MessageProperties;
-import org.apache.qpid.transport.Header;
+import org.apache.qpid.transport.*;
 import org.apache.qpid.nclient.MessagePartListener;
 
 /**
@@ -26,16 +24,35 @@
 		_adaptee = listener;
     }
 
-    public void messageTransfer(int transferId)
+    public void messageTransfer(MessageTransfer xfr)
     {
-        _currentMsg = new ByteBufferMessage(transferId);
-    }
+        _currentMsg = new ByteBufferMessage(xfr.getId());
+
+        for (Struct st : xfr.getHeader().getStructs())
+        {
+            if(st instanceof DeliveryProperties)
+            {
+                _currentMsg.setDeliveryProperties((DeliveryProperties)st);
+
+            }
+            else if(st instanceof MessageProperties)
+            {
+                _currentMsg.setMessageProperties((MessageProperties)st);
+            }
+
+        }
+
+
+        ByteBuffer body = xfr.getBody();
+        if (body == null)
+        {
+            body = ByteBuffer.allocate(0);
+        }
+
 
-    public void data(ByteBuffer src)
-    {
         try
         {
-            _currentMsg.appendData(src);
+            _currentMsg.appendData(body);
         }
         catch(IOException e)
         {
@@ -43,16 +60,7 @@
             // doesn't occur as we are using
             // a ByteBuffer
         }
-    }
 
-    public void messageHeader(Header header)
-    {
-        _currentMsg.setDeliveryProperties(header.get(DeliveryProperties.class));
-        _currentMsg.setMessageProperties(header.get(MessageProperties.class));
-    }
-
-    public void messageReceived()
-    {
         _adaptee.onMessage(_currentMsg);
     }
 

Modified: incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java?rev=684036&r1=684035&r2=684036&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java Fri Aug  8 11:31:18 2008
@@ -31,7 +31,7 @@
 import javax.jms.*;
 import java.util.Map;
 
-public class TestAMQSession extends AMQSession
+public class TestAMQSession extends AMQSession<BasicMessageConsumer_0_8, BasicMessageProducer_0_8>
 {
 
     public TestAMQSession()
@@ -94,7 +94,7 @@
 
     }
 
-    public BasicMessageConsumer createMessageConsumer(AMQDestination destination, int prefetchHigh, int prefetchLow, boolean noLocal, boolean exclusive, String selector, FieldTable arguments, boolean noConsume, boolean autoClose) throws JMSException
+    public BasicMessageConsumer_0_8 createMessageConsumer(AMQDestination destination, int prefetchHigh, int prefetchLow, boolean noLocal, boolean exclusive, String selector, FieldTable arguments, boolean noConsume, boolean autoClose) throws JMSException
     {
         return null;
     }
@@ -109,12 +109,12 @@
         return false;
     }
 
-    public void sendConsume(BasicMessageConsumer consumer, AMQShortString queueName, AMQProtocolHandler protocolHandler, boolean nowait, String messageSelector, AMQShortString tag) throws AMQException, FailoverException
+    public void sendConsume(BasicMessageConsumer_0_8 consumer, AMQShortString queueName, AMQProtocolHandler protocolHandler, boolean nowait, String messageSelector, int tag) throws AMQException, FailoverException
     {
 
     }
 
-    public BasicMessageProducer createMessageProducer(Destination destination, boolean mandatory, boolean immediate, boolean waitUntilSent, long producerId)
+    public BasicMessageProducer_0_8 createMessageProducer(Destination destination, boolean mandatory, boolean immediate, boolean waitUntilSent, long producerId)
     {
         return null;
     }

Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java?rev=684036&r1=684035&r2=684036&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java Fri Aug  8 11:31:18 2008
@@ -111,6 +111,8 @@
     private final byte[] _data;
     private final int _offset;
     private int _hashCode;
+    private String _asString = null;
+
     private final int _length;
     private static final char[] EMPTY_CHAR_ARRAY = new char[0];
     
@@ -137,7 +139,7 @@
     public AMQShortString(String data)
     {
         this((data == null) ? EMPTY_CHAR_ARRAY : data.toCharArray());
-
+        _asString = data;
     }
 
     public AMQShortString(char[] data)
@@ -418,15 +420,14 @@
         return chars;
     }
 
-    private String str = null;
 
     public String asString()
     {
-        if (str == null)
+        if (_asString == null)
         {
-            str = new String(asChars());
+            _asString = new String(asChars());
         }
-        return str;
+        return _asString;
     }
 
     public boolean equals(Object o)

Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Header.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Header.java?rev=684036&r1=684035&r2=684036&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Header.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Header.java Fri Aug  8 11:31:18 2008
@@ -24,6 +24,8 @@
 
 import java.util.Arrays;
 import java.util.List;
+import java.util.Map;
+import java.util.LinkedHashMap;
 import java.nio.ByteBuffer;
 
 
@@ -35,30 +37,31 @@
 
 public class Header {
 
-    private final List<Struct> structs;
+    private final Struct[] structs;
 
     public Header(List<Struct> structs)
     {
-        this.structs = structs;
+        this(structs.toArray(new Struct[structs.size()]));
     }
 
     public Header(Struct ... structs)
     {
-        this(Arrays.asList(structs));
+        this.structs = structs;
     }
 
-    public List<Struct> getStructs()
+    public Struct[] getStructs()
     {
         return structs;
     }
 
+
     public <T> T get(Class<T> klass)
     {
         for (Struct st : structs)
         {
             if (klass.isInstance(st))
             {
-                return klass.cast(st);
+                return (T) st;
             }
         }
 

Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java?rev=684036&r1=684035&r2=684036&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java Fri Aug  8 11:31:18 2008
@@ -128,8 +128,14 @@
     {
         int id = nextCommandId();
         cmd.setId(id);
-        log.debug("ID: [%s] %s", this.channel, id);
-        if ((id % 65536) == 0)
+
+        if(log.isDebugEnabled())
+        {
+            log.debug("ID: [%s] %s", this.channel, id);
+        }
+
+        //if ((id % 65536) == 0)
+        if ((id & 0xff) == 0)
         {
             flushProcessed(TIMELY_REPLY);
         }
@@ -232,7 +238,11 @@
 
     boolean complete(int lower, int upper)
     {
-        log.debug("%s complete(%d, %d)", this, lower, upper);
+        //avoid autoboxing
+        if(log.isDebugEnabled())
+        {
+            log.debug("%s complete(%d, %d)", this, lower, upper);
+        }
         synchronized (commands)
         {
             int old = maxComplete;

Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Assembler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Assembler.java?rev=684036&r1=684035&r2=684036&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Assembler.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Assembler.java Fri Aug  8 11:31:18 2008
@@ -200,7 +200,7 @@
             break;
         case HEADER:
             command = incomplete[channel];
-            List<Struct> structs = new ArrayList();
+            List<Struct> structs = new ArrayList(2);
             while (dec.hasRemaining())
             {
                 structs.add(dec.readStruct32());

Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java?rev=684036&r1=684035&r2=684036&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java Fri Aug  8 11:31:18 2008
@@ -34,7 +34,6 @@
 
 import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
-import java.util.List;
 
 import static org.apache.qpid.transport.network.Frame.*;
 
@@ -209,11 +208,11 @@
         if (payload)
         {
             final Header hdr = method.getHeader();
-            final List<Struct> structs = hdr.getStructs();
-            final int nstructs = structs.size();
-            for (int i = 0; i < nstructs; i++)
+            final Struct[] structs = hdr.getStructs();
+
+            for (Struct st : structs)
             {
-                enc.writeStruct32(structs.get(i));
+                enc.writeStruct32(st);
             }
             headerSeg = enc.segment();
         }

Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/util/Logger.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/util/Logger.java?rev=684036&r1=684035&r2=684036&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/util/Logger.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/util/Logger.java Fri Aug  8 11:31:18 2008
@@ -42,6 +42,11 @@
         this.log = log;
     }
 
+    public boolean isDebugEnabled()
+    {
+        return log.isDebugEnabled();
+    }
+
     public void debug(String message, Object ... args)
     {
         if (log.isDebugEnabled())