You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ai...@apache.org on 2008/02/28 13:18:14 UTC

svn commit: r631938 [3/5] - in /incubator/qpid/branches/thegreatmerge/qpid: ./ java/ java/broker/ java/broker/src/main/java/org/apache/qpid/server/ java/broker/src/main/java/org/apache/qpid/server/exchange/ java/broker/src/main/java/org/apache/qpid/ser...

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java?rev=631938&r1=631937&r2=631938&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java Thu Feb 28 04:16:41 2008
@@ -26,12 +26,7 @@
 import org.apache.qpid.client.message.MessageFactoryRegistry;
 import org.apache.qpid.client.message.UnprocessedMessage;
 import org.apache.qpid.client.protocol.AMQProtocolHandler;
-import org.apache.qpid.framing.AMQFrame;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.BasicCancelBody;
-import org.apache.qpid.framing.BasicCancelOkBody;
-import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.*;
 import org.apache.qpid.jms.MessageConsumer;
 import org.apache.qpid.jms.Session;
 import org.slf4j.Logger;
@@ -54,13 +49,13 @@
     private static final Logger _logger = LoggerFactory.getLogger(BasicMessageConsumer.class);
 
     /** The connection being used by this consumer */
-    protected AMQConnection _connection;
+    protected final AMQConnection _connection;
 
-    private String _messageSelector;
+    private final String _messageSelector;
 
-    private boolean _noLocal;
+    private final boolean _noLocal;
 
-    private AMQDestination _destination;
+    private final AMQDestination _destination;
 
     /**
      * When true indicates that a blocking receive call is in progress
@@ -75,7 +70,7 @@
     protected AMQShortString _consumerTag;
 
     /** We need to know the channel id when constructing frames */
-    protected int _channelId;
+    protected final int _channelId;
 
     /**
      * Used in the blocking receive methods to receive a message from the Session thread. <p/> Or to notify of errors
@@ -83,40 +78,40 @@
      */
     protected final ArrayBlockingQueue _synchronousQueue;
 
-    protected MessageFactoryRegistry _messageFactory;
+    protected final MessageFactoryRegistry _messageFactory;
 
     protected final AMQSession _session;
 
-    protected AMQProtocolHandler _protocolHandler;
+    protected final AMQProtocolHandler _protocolHandler;
 
     /**
      * We need to store the "raw" field table so that we can resubscribe in the event of failover being required
      */
-    private FieldTable _rawSelectorFieldTable;
+    private final FieldTable _rawSelectorFieldTable;
 
     /**
      * We store the high water prefetch field in order to be able to reuse it when resubscribing in the event of
      * failover
      */
-    private int _prefetchHigh;
+    private final int _prefetchHigh;
 
     /**
      * We store the low water prefetch field in order to be able to reuse it when resubscribing in the event of
      * failover
      */
-    private int _prefetchLow;
+    private final int _prefetchLow;
 
     /**
      * We store the exclusive field in order to be able to reuse it when resubscribing in the event of failover
      */
-    private boolean _exclusive;
+    private final boolean _exclusive;
 
     /**
      * The acknowledge mode in force for this consumer. Note that the AMQP protocol allows different ack modes per
      * consumer whereas JMS defines this at the session level, hence why we associate it with the consumer in our
      * implementation.
      */
-    private int _acknowledgeMode;
+    private final int _acknowledgeMode;
 
     /**
      * Number of messages unacknowledged in DUPS_OK_ACKNOWLEDGE mode
@@ -153,10 +148,10 @@
      * autoClose denotes that the consumer will automatically cancel itself when there are no more messages to receive
      * on the queue.  This is used for queue browsing.
      */
-    private boolean _autoClose;
+    private final boolean _autoClose;
     private boolean _closeWhenNoMessages;
 
-    private boolean _noConsume;
+    private final boolean _noConsume;
     private List<StackTraceElement> _closedStack = null;
 
     protected BasicMessageConsumer(int channelId, AMQConnection connection, AMQDestination destination,
@@ -177,7 +172,7 @@
         _prefetchHigh = prefetchHigh;
         _prefetchLow = prefetchLow;
         _exclusive = exclusive;
-        _acknowledgeMode = acknowledgeMode;
+
         _synchronousQueue = new ArrayBlockingQueue(prefetchHigh, true);
         _autoClose = autoClose;
         _noConsume = noConsume;
@@ -187,6 +182,10 @@
         {
             _acknowledgeMode = Session.NO_ACKNOWLEDGE;
         }
+        else
+        {
+            _acknowledgeMode = acknowledgeMode;
+        }
     }
 
     public AMQDestination getDestination()
@@ -665,9 +664,8 @@
      * message listener or a synchronous receive() caller.
      *
      * @param messageFrame the raw unprocessed mesage
-     * @param channelId    channel on which this message was sent
      */
-    void notifyMessage(UnprocessedMessage messageFrame, int channelId)
+    void notifyMessage(UnprocessedMessage messageFrame)
     {
         final boolean debug = _logger.isDebugEnabled();
 
@@ -678,12 +676,7 @@
 
         try
         {
-            AbstractJMSMessage jmsMessage =
-                _messageFactory.createMessage(messageFrame.getDeliveryTag(), messageFrame.isRedelivered(),
-                                              messageFrame.getExchange(), messageFrame.getRoutingKey(),
-                                              (ContentHeaderBody) messageFrame.getContentHeader(), 
-                                              messageFrame.getBodies());
-
+            AbstractJMSMessage jmsMessage = createJMSMessageFromUnprocessedMessage(messageFrame);
             if (debug)
             {
                 _logger.debug("Message is of type: " + jmsMessage.getClass().getName());
@@ -694,11 +687,9 @@
                 // if (!_closed.get())
                 {
 
-                    jmsMessage.setConsumer(this);
-
                     preDeliver(jmsMessage);
 
-                    notifyMessage(jmsMessage, channelId);
+                    notifyMessage(jmsMessage);
                 }
                 // else
                 // {
@@ -727,9 +718,8 @@
 
     /**
      * @param jmsMessage this message has already been processed so can't redo preDeliver
-     * @param channelId
      */
-    public void notifyMessage(AbstractJMSMessage jmsMessage, int channelId)
+    public void notifyMessage(AbstractJMSMessage jmsMessage)
     {
         try
         {

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java?rev=631938&r1=631937&r2=631938&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java Thu Feb 28 04:16:41 2008
@@ -135,7 +135,7 @@
         }
         if (messageOk)
         {
-            super.notifyMessage(jmsMessage, channelId);
+            super.notifyMessage(jmsMessage);
         }
     }
 
@@ -150,7 +150,7 @@
     {
         int channelId = getSession().getChannelId();
         long deliveryId = message.getMessageTransferId();
-        String consumerTag = getConsumerTag().toString();
+        AMQShortString consumerTag = getConsumerTag();
         AMQShortString exchange;
         AMQShortString routingKey;
         boolean redelivered = false;
@@ -264,7 +264,7 @@
             }
             ((UnprocessedMessage_0_10) messageFrame).setReplyToURL(replyToUrl);
         }
-        super.notifyMessage(messageFrame, channelId);
+        super.notifyMessage(messageFrame);
     }
 
     public AbstractJMSMessage createJMSMessageFromUnprocessedMessage(

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java?rev=631938&r1=631937&r2=631938&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java Thu Feb 28 04:16:41 2008
@@ -553,6 +553,18 @@
         frames[0] = publishFrame;
         frames[1] = contentHeaderFrame;
         CompositeAMQDataBlock compositeFrame = new CompositeAMQDataBlock(frames);
+
+        try
+        {
+            _session.checkFlowControl();
+        }
+        catch (InterruptedException e)
+        {
+            JMSException jmsEx = new JMSException("Interrupted while waiting for flow control to be removed");
+            jmsEx.setLinkedException(e);
+            throw jmsEx;
+        }
+
         _protocolHandler.writeFrame(compositeFrame, wait);
 
         if (message != origMessage)

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicDeliverMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicDeliverMethodHandler.java?rev=631938&r1=631937&r2=631938&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicDeliverMethodHandler.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicDeliverMethodHandler.java Thu Feb 28 04:16:41 2008
@@ -21,12 +21,12 @@
 package org.apache.qpid.client.handler;
 
 import org.apache.qpid.AMQException;
+import org.apache.qpid.client.message.UnprocessedMessage;
 import org.apache.qpid.client.message.UnprocessedMessage_0_8;
 import org.apache.qpid.client.protocol.AMQProtocolSession;
 import org.apache.qpid.client.state.AMQStateManager;
 import org.apache.qpid.client.state.StateAwareMethodListener;
 import org.apache.qpid.framing.BasicDeliverBody;
-import org.apache.qpid.protocol.AMQMethodEvent;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -45,7 +45,13 @@
         throws AMQException
     {
         final AMQProtocolSession session = stateManager.getProtocolSession();
-        final UnprocessedMessage_0_8 msg = new UnprocessedMessage_0_8(channelId, body);
+        final UnprocessedMessage_0_8 msg = new UnprocessedMessage_0_8(
+                channelId,
+                body.getDeliveryTag(),
+                body.getConsumerTag(),
+                body.getExchange(),
+                body.getRoutingKey(),
+                body.getRedelivered());
         _logger.debug("New JmsDeliver method received");
         session.unprocessedMessageReceived(msg);
     }

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicReturnMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicReturnMethodHandler.java?rev=631938&r1=631937&r2=631938&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicReturnMethodHandler.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicReturnMethodHandler.java Thu Feb 28 04:16:41 2008
@@ -21,11 +21,15 @@
 package org.apache.qpid.client.handler;
 
 import org.apache.qpid.AMQException;
+import org.apache.qpid.client.message.ReturnMessage;
+import org.apache.qpid.client.message.UnprocessedMessage;
 import org.apache.qpid.client.message.UnprocessedMessage_0_8;
 import org.apache.qpid.client.protocol.AMQProtocolSession;
 import org.apache.qpid.client.state.AMQStateManager;
 import org.apache.qpid.client.state.StateAwareMethodListener;
 import org.apache.qpid.framing.BasicReturnBody;
+import org.apache.qpid.protocol.AMQMethodEvent;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -40,16 +44,20 @@
         return _instance;
     }
 
+
     public void methodReceived(AMQStateManager stateManager, BasicReturnBody body, int channelId)
-        throws AMQException
+    throws AMQException
     {
         _logger.debug("New JmsBounce method received");
         final AMQProtocolSession session = stateManager.getProtocolSession();
-        /** FIXME: TGM AS SRSLY 4RL */
-        final UnprocessedMessage_0_8 msg = new UnprocessedMessage_0_8(channelId, body);
+        final ReturnMessage msg = new ReturnMessage(channelId,
+                body.getExchange(),
+                body.getRoutingKey(),
+                body.getReplyText(),
+                body.getReplyCode()
+        );
 
         session.unprocessedMessageReceived(msg);
     }
-
 
 }

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java?rev=631938&r1=631937&r2=631938&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java Thu Feb 28 04:16:41 2008
@@ -43,6 +43,7 @@
 import java.util.Map;
 import java.util.UUID;
 import java.io.IOException;
+import java.net.URISyntaxException;
 
 public abstract class AbstractJMSMessage extends AMQMessage implements org.apache.qpid.jms.Message
 {
@@ -56,8 +57,8 @@
     protected boolean _changedData = true;
     private Destination _destination;
     private JMSHeaderAdapter _headerAdapter;
-    private BasicMessageConsumer _consumer;
-    private boolean _strictAMQP;
+    private static final boolean STRICT_AMQP_COMPLIANCE =
+            Boolean.parseBoolean(System.getProperties().getProperty(AMQSession.STRICT_AMQP, AMQSession.STRICT_AMQP_DEFAULT));
 
     /**
      * This is 0_10 specific
@@ -101,6 +102,7 @@
         return _010message;
     }
 
+
     protected AbstractJMSMessage(ByteBuffer data)
     {
         super(new BasicContentHeaderProperties());
@@ -115,8 +117,6 @@
         _changedData = (data == null);
         _headerAdapter = new JMSHeaderAdapter(((BasicContentHeaderProperties) _contentHeaderProperties).getHeaders());
 
-        _strictAMQP =
-            Boolean.parseBoolean(System.getProperties().getProperty(AMQSession.STRICT_AMQP, AMQSession.STRICT_AMQP_DEFAULT));
     }
 
     protected AbstractJMSMessage(long deliveryTag, BasicContentHeaderProperties contentHeader, AMQShortString exchange,
@@ -164,7 +164,10 @@
     {
         if (getContentHeaderProperties().getMessageIdAsString() == null)
         {
-            getContentHeaderProperties().setMessageId("ID:" + UUID.randomUUID());
+            StringBuilder b = new StringBuilder(39);
+            b.append("ID");
+            b.append(UUID.randomUUID());
+            getContentHeaderProperties().setMessageId(b.toString());
         }
 
         return getContentHeaderProperties().getMessageIdAsString();
@@ -222,7 +225,7 @@
                     BindingURL binding = new AMQBindingURL(replyToEncoding);
                     dest = AMQDestination.createDestination(binding);
                 }
-                catch (URLSyntaxException e)
+                catch (URISyntaxException e)
                 {
                     throw new JMSAMQException("Illegal value in JMS_ReplyTo property: " + replyToEncoding, e);
                 }
@@ -344,7 +347,7 @@
 
     public boolean getBooleanProperty(AMQShortString propertyName) throws JMSException
     {
-        if (_strictAMQP)
+        if (STRICT_AMQP_COMPLIANCE)
         {
             throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
         }
@@ -354,7 +357,7 @@
 
     public boolean getBooleanProperty(String propertyName) throws JMSException
     {
-        if (_strictAMQP)
+        if (STRICT_AMQP_COMPLIANCE)
         {
             throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
         }
@@ -364,7 +367,7 @@
 
     public byte getByteProperty(String propertyName) throws JMSException
     {
-        if (_strictAMQP)
+        if (STRICT_AMQP_COMPLIANCE)
         {
             throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
         }
@@ -374,7 +377,7 @@
 
     public byte[] getBytesProperty(AMQShortString propertyName) throws JMSException
     {
-        if (_strictAMQP)
+        if (STRICT_AMQP_COMPLIANCE)
         {
             throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
         }
@@ -384,7 +387,7 @@
 
     public short getShortProperty(String propertyName) throws JMSException
     {
-        if (_strictAMQP)
+        if (STRICT_AMQP_COMPLIANCE)
         {
             throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
         }
@@ -394,7 +397,7 @@
 
     public int getIntProperty(String propertyName) throws JMSException
     {
-        if (_strictAMQP)
+        if (STRICT_AMQP_COMPLIANCE)
         {
             throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
         }
@@ -404,7 +407,7 @@
 
     public long getLongProperty(String propertyName) throws JMSException
     {
-        if (_strictAMQP)
+        if (STRICT_AMQP_COMPLIANCE)
         {
             throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
         }
@@ -414,7 +417,7 @@
 
     public float getFloatProperty(String propertyName) throws JMSException
     {
-        if (_strictAMQP)
+        if (STRICT_AMQP_COMPLIANCE)
         {
             throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
         }
@@ -424,7 +427,7 @@
 
     public double getDoubleProperty(String propertyName) throws JMSException
     {
-        if (_strictAMQP)
+        if (STRICT_AMQP_COMPLIANCE)
         {
             throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
         }
@@ -434,12 +437,25 @@
 
     public String getStringProperty(String propertyName) throws JMSException
     {
-        if (_strictAMQP)
+	if (propertyName.startsWith("JMSX"))
         {
-            throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
+            //NOTE: if the JMSX Property is a non AMQP property then we must check _strictAMQP and throw as below.
+            if (propertyName.equals(CustomJMSXProperty.JMSXUserID.toString()))
+            {
+                return ((BasicContentHeaderProperties) _contentHeaderProperties).getUserIdAsString();
+            }
+
+            return null;
         }
+        else
+        {
+            if (STRICT_AMQP_COMPLIANCE)
+            {
+                throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
+            }
 
-        return getJmsHeaders().getString(propertyName);
+            return getJmsHeaders().getString(propertyName);
+        }
     }
 
     public Object getObjectProperty(String propertyName) throws JMSException
@@ -454,7 +470,7 @@
 
     public void setBooleanProperty(AMQShortString propertyName, boolean b) throws JMSException
     {
-        if (_strictAMQP)
+        if (STRICT_AMQP_COMPLIANCE)
         {
             throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
         }
@@ -465,7 +481,7 @@
 
     public void setBooleanProperty(String propertyName, boolean b) throws JMSException
     {
-        if (_strictAMQP)
+        if (STRICT_AMQP_COMPLIANCE)
         {
             throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
         }
@@ -476,7 +492,7 @@
 
     public void setByteProperty(String propertyName, byte b) throws JMSException
     {
-        if (_strictAMQP)
+        if (STRICT_AMQP_COMPLIANCE)
         {
             throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
         }
@@ -487,7 +503,7 @@
 
     public void setBytesProperty(AMQShortString propertyName, byte[] bytes) throws JMSException
     {
-        if (_strictAMQP)
+        if (STRICT_AMQP_COMPLIANCE)
         {
             throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
         }
@@ -498,7 +514,7 @@
 
     public void setShortProperty(String propertyName, short i) throws JMSException
     {
-        if (_strictAMQP)
+        if (STRICT_AMQP_COMPLIANCE)
         {
             throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
         }
@@ -516,7 +532,7 @@
 
     public void setLongProperty(String propertyName, long l) throws JMSException
     {
-        if (_strictAMQP)
+        if (STRICT_AMQP_COMPLIANCE)
         {
             throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
         }
@@ -527,7 +543,7 @@
 
     public void setFloatProperty(String propertyName, float f) throws JMSException
     {
-        if (_strictAMQP)
+        if (STRICT_AMQP_COMPLIANCE)
         {
             throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
         }
@@ -538,7 +554,7 @@
 
     public void setDoubleProperty(String propertyName, double v) throws JMSException
     {
-        if (_strictAMQP)
+        if (STRICT_AMQP_COMPLIANCE)
         {
             throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
         }
@@ -732,11 +748,6 @@
         {
             return 0;
         }
-    }
-
-    public void setConsumer(BasicMessageConsumer basicMessageConsumer)
-    {
-        _consumer = basicMessageConsumer;
     }
 
     public void receivedFromServer()

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java?rev=631938&r1=631937&r2=631938&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java Thu Feb 28 04:16:41 2008
@@ -56,7 +56,11 @@
     JMSMapMessage(ByteBuffer data) throws JMSException
     {
         super(data); // this instantiates a content header
-        populateMapFromData();
+        if(data != null)
+        {
+            populateMapFromData();
+        }
+
     }
 
     JMSMapMessage(long messageNbr, BasicContentHeaderProperties contentHeader, AMQShortString exchange, AMQShortString routingKey,
@@ -77,7 +81,7 @@
 
     public String toBodyString() throws JMSException
     {
-        return _map.toString();
+        return _map == null ? "" : _map.toString();
     }
 
     public AMQShortString getMimeTypeAsShortString()

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/message/ReturnMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/message/ReturnMessage.java?rev=631938&r1=631937&r2=631938&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/message/ReturnMessage.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/message/ReturnMessage.java Thu Feb 28 04:16:41 2008
@@ -9,7 +9,7 @@
 
     public ReturnMessage(int channelId,AMQShortString exchange,AMQShortString routingKey,AMQShortString replyText,int replyCode)
     {
-        super(channelId,-1,"",exchange,routingKey,false);
+        super(channelId,-1,null,exchange,routingKey,false);
         _replyText = replyText;
         _replyCode = replyCode;
     }

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java?rev=631938&r1=631937&r2=631938&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java Thu Feb 28 04:16:41 2008
@@ -36,12 +36,12 @@
 {
     private final int _channelId;
     private final long _deliveryId;
-    private final String _consumerTag;
+    private final AMQShortString _consumerTag;
     protected AMQShortString _exchange;
     protected AMQShortString _routingKey;
     protected boolean _redelivered;
 
-    public UnprocessedMessage(int channelId,long deliveryId,String consumerTag,AMQShortString exchange,AMQShortString routingKey,boolean redelivered)
+    public UnprocessedMessage(int channelId,long deliveryId,AMQShortString consumerTag,AMQShortString exchange,AMQShortString routingKey,boolean redelivered)
     {
         _channelId = channelId;
         _deliveryId = deliveryId;
@@ -65,7 +65,7 @@
         return _deliveryId;
     }
 
-    public String getConsumerTag()
+    public AMQShortString getConsumerTag()
     {
         return _consumerTag;
     }

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_10.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_10.java?rev=631938&r1=631937&r2=631938&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_10.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_10.java Thu Feb 28 04:16:41 2008
@@ -43,7 +43,7 @@
     /** List of ContentBody instances. Due to fragmentation you don't know how big this will be in general */
     private List<ByteBuffer> _bodies = new ArrayList<ByteBuffer>();
 
-    public UnprocessedMessage_0_10(int channelId,long deliveryId,String consumerTag,AMQShortString exchange,AMQShortString routingKey,boolean redelivered)
+    public UnprocessedMessage_0_10(int channelId,long deliveryId,AMQShortString consumerTag,AMQShortString exchange,AMQShortString routingKey,boolean redelivered)
     {
         super(channelId,deliveryId,consumerTag,exchange,routingKey,redelivered);
     }

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_8.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_8.java?rev=631938&r1=631937&r2=631938&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_8.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_8.java Thu Feb 28 04:16:41 2008
@@ -47,7 +47,7 @@
     /** List of ContentBody instances. Due to fragmentation you don't know how big this will be in general */
     private List<ContentBody> _bodies;
 
-    public UnprocessedMessage_0_8(int channelId,long deliveryId,String consumerTag,AMQShortString exchange,AMQShortString routingKey,boolean redelivered)
+    public UnprocessedMessage_0_8(int channelId,long deliveryId,AMQShortString consumerTag,AMQShortString exchange,AMQShortString routingKey,boolean redelivered)
     {
         super(channelId,deliveryId,consumerTag,exchange,routingKey,redelivered);
     }
@@ -60,7 +60,7 @@
 
     public UnprocessedMessage_0_8(int channelId, BasicDeliverBody body)
     {
-        super(channelId, body.getDeliveryTag(), body.getConsumerTag().toString(), body.getExchange(), body.getRoutingKey(), false);
+        super(channelId, body.getDeliveryTag(), body.getConsumerTag(), body.getExchange(), body.getRoutingKey(), false);
     }
 
     public void receiveBody(ContentBody body)

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java?rev=631938&r1=631937&r2=631938&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java Thu Feb 28 04:16:41 2008
@@ -361,6 +361,7 @@
                 // this will attemp failover
 
                 sessionClosed(session);
+                _connection.exceptionReceived(cause);
             }
 
             // FIXME Need to correctly handle other exceptions. Things like ...
@@ -392,7 +393,7 @@
      */
     public void propagateExceptionToWaiters(Exception e)
     {
-        getStateManager().error(e);
+        
         if (!_frameListeners.isEmpty())
         {
             final Iterator it = _frameListeners.iterator();
@@ -424,78 +425,7 @@
 
             HeartbeatDiagnostics.received(bodyFrame instanceof HeartbeatBody);
 
-            switch (bodyFrame.getFrameType())
-            {
-                case AMQMethodBody.TYPE:
-
-                    if (debug)
-                    {
-                        _logger.debug("(" + System.identityHashCode(this) + ")Method frame received: " + frame);
-                    }
-
-                    final AMQMethodEvent<AMQMethodBody> evt =
-                            new AMQMethodEvent<AMQMethodBody>(frame.getChannel(), (AMQMethodBody) bodyFrame);
-
-                    try
-                    {
-
-                        boolean wasAnyoneInterested = getStateManager().methodReceived(evt);
-                        if (!_frameListeners.isEmpty())
-                        {
-                            Iterator it = _frameListeners.iterator();
-                            while (it.hasNext())
-                            {
-                                final AMQMethodListener listener = (AMQMethodListener) it.next();
-                                wasAnyoneInterested = listener.methodReceived(evt) || wasAnyoneInterested;
-                            }
-                        }
-
-                        if (!wasAnyoneInterested)
-                        {
-                            throw new AMQException(null, "AMQMethodEvent " + evt + " was not processed by any listener.  Listeners:"
-                                                   + _frameListeners, null);
-                        }
-                    }
-                    catch (AMQException e)
-                    {
-                        getStateManager().error(e);
-                        if (!_frameListeners.isEmpty())
-                        {
-                            Iterator it = _frameListeners.iterator();
-                            while (it.hasNext())
-                            {
-                                final AMQMethodListener listener = (AMQMethodListener) it.next();
-                                listener.error(e);
-                            }
-                        }
-
-                        exceptionCaught(session, e);
-                    }
-
-                    break;
-
-                case ContentHeaderBody.TYPE:
-
-                    _protocolSession.messageContentHeaderReceived(frame.getChannel(), (ContentHeaderBody) bodyFrame);
-                    break;
-
-                case ContentBody.TYPE:
-
-                    _protocolSession.messageContentBodyReceived(frame.getChannel(), (ContentBody) bodyFrame);
-                    break;
-
-                case HeartbeatBody.TYPE:
-
-                    if (debug)
-                    {
-                        _logger.debug("Received heartbeat");
-                    }
-
-                    break;
-
-                default:
-
-            }
+            bodyFrame.handle(frame.getChannel(),_protocolSession);
 
             _connection.bytesReceived(_protocolSession.getIoSession().getReadBytes());
         }
@@ -511,6 +441,55 @@
             // get round a bug in old versions of qpid whereby the connection is not closed
             _stateManager.changeState(AMQState.CONNECTION_CLOSED);
         }
+    }
+
+    public void methodBodyReceived(final int channelId, final AMQBody bodyFrame, IoSession session)//, final IoSession session)
+            throws AMQException
+    {
+
+        if (_logger.isDebugEnabled())
+        {
+            _logger.debug("(" + System.identityHashCode(this) + ")Method frame received: " + bodyFrame);
+        }
+
+        final AMQMethodEvent<AMQMethodBody> evt =
+                new AMQMethodEvent<AMQMethodBody>(channelId, (AMQMethodBody) bodyFrame);
+
+        try
+                    {
+
+                        boolean wasAnyoneInterested = getStateManager().methodReceived(evt);
+            if (!_frameListeners.isEmpty())
+            {
+                Iterator it = _frameListeners.iterator();
+                while (it.hasNext())
+                {
+                    final AMQMethodListener listener = (AMQMethodListener) it.next();
+                    wasAnyoneInterested = listener.methodReceived(evt) || wasAnyoneInterested;
+                }
+            }
+
+            if (!wasAnyoneInterested)
+            {
+                throw new AMQException(null, "AMQMethodEvent " + evt + " was not processed by any listener.  Listeners:"
+                                       + _frameListeners, null);
+            }
+        }
+        catch (AMQException e)
+        {            
+            if (!_frameListeners.isEmpty())
+            {
+                Iterator it = _frameListeners.iterator();
+                while (it.hasNext())
+                {
+                    final AMQMethodListener listener = (AMQMethodListener) it.next();
+                    listener.error(e);
+                }
+            }
+
+            exceptionCaught(session, e);
+        }
+
     }
 
     private static int _messagesOut;

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java?rev=631938&r1=631937&r2=631938&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java Thu Feb 28 04:16:41 2008
@@ -86,7 +86,8 @@
      * Maps from a channel id to an unprocessed message. This is used to tie together the JmsDeliverBody (which arrives
      * first) with the subsequent content header and content bodies.
      */
-    protected ConcurrentMap<Integer,UnprocessedMessage_0_8> _channelId2UnprocessedMsgMap = new ConcurrentHashMap<Integer,UnprocessedMessage_0_8>();
+    private final ConcurrentMap<Integer,UnprocessedMessage> _channelId2UnprocessedMsgMap = new ConcurrentHashMap<Integer,UnprocessedMessage>();
+    private final UnprocessedMessage[] _channelId2UnprocessedMsgArray = new UnprocessedMessage[16];
 
     /** Counter to ensure unique queue names */
     protected int _queueId = 1;
@@ -104,7 +105,8 @@
     private MethodDispatcher _methodDispatcher;
 
 
-    private final AMQConnection _connection;    
+    private final AMQConnection _connection;
+    private static final int FAST_CHANNEL_ACCESS_MASK = 0xFFFFFFF0;
 
     public AMQProtocolSession(AMQProtocolHandler protocolHandler, IoSession protocolSession, AMQConnection connection)
     {
@@ -233,14 +235,25 @@
      *
      * @throws AMQException if this was not expected
      */
-    public void unprocessedMessageReceived(UnprocessedMessage_0_8 message) throws AMQException
+    public void unprocessedMessageReceived(UnprocessedMessage message) throws AMQException
     {
-        _channelId2UnprocessedMsgMap.put(message.getChannelId(), message);
+        final int channelId = message.getChannelId();
+        if((channelId & FAST_CHANNEL_ACCESS_MASK) == 0)
+        {
+            _channelId2UnprocessedMsgArray[channelId] = message;    
+        }
+        else
+        {
+            _channelId2UnprocessedMsgMap.put(channelId, message);
+        }
     }
 
-    public void messageContentHeaderReceived(int channelId, ContentHeaderBody contentHeader) throws AMQException
+    public void contentHeaderReceived(int channelId, ContentHeaderBody contentHeader) throws AMQException
     {
-        UnprocessedMessage_0_8 msg = (UnprocessedMessage_0_8) _channelId2UnprocessedMsgMap.get(channelId);
+        final UnprocessedMessage msg = (channelId & FAST_CHANNEL_ACCESS_MASK) == 0 ? _channelId2UnprocessedMsgArray[channelId]
+                                                               : _channelId2UnprocessedMsgMap.get(channelId);
+
+
         if (msg == null)
         {
             throw new AMQException(null, "Error: received content header without having received a BasicDeliver frame first", null);
@@ -258,9 +271,19 @@
         }
     }
 
-    public void messageContentBodyReceived(int channelId, ContentBody contentBody) throws AMQException
+    public void contentBodyReceived(final int channelId, ContentBody contentBody) throws AMQException
     {
-        UnprocessedMessage_0_8 msg = _channelId2UnprocessedMsgMap.get(channelId);
+        UnprocessedMessage_0_8 msg;
+        final boolean fastAccess = (channelId & FAST_CHANNEL_ACCESS_MASK) == 0;
+        if(fastAccess)
+        {
+            msg = (UnprocessedMessage_0_8) _channelId2UnprocessedMsgArray[channelId];
+        }
+        else
+        {
+            msg = (UnprocessedMessage_0_8) _channelId2UnprocessedMsgMap.get(channelId);
+        }
+
         if (msg == null)
         {
             throw new AMQException(null, "Error: received content body without having received a JMSDeliver frame first", null);
@@ -268,7 +291,14 @@
 
         if (msg.getContentHeader() == null)
         {
-            _channelId2UnprocessedMsgMap.remove(channelId);
+            if(fastAccess)
+            {
+                _channelId2UnprocessedMsgArray[channelId] = null;
+            }
+            else
+            {
+                _channelId2UnprocessedMsgMap.remove(channelId);
+            }
             throw new AMQException(null, "Error: received content body without having received a ContentHeader frame first", null);
         }
 
@@ -288,6 +318,11 @@
         }
     }
 
+    public void heartbeatBodyReceived(int channelId, HeartbeatBody body) throws AMQException
+    {
+
+    }
+
     /**
      * Deliver a message to the appropriate session, removing the unprocessed message from our map
      *
@@ -298,7 +333,14 @@
     {
         AMQSession session = getSession(channelId);
         session.messageReceived(msg);
-        _channelId2UnprocessedMsgMap.remove(channelId);
+        if((channelId & FAST_CHANNEL_ACCESS_MASK) == 0)
+        {
+            _channelId2UnprocessedMsgArray[channelId] = null;
+        }
+        else
+        {
+            _channelId2UnprocessedMsgMap.remove(channelId);
+        }
     }
 
     protected AMQSession getSession(int channelId)
@@ -488,10 +530,19 @@
         final AMQSession session = getSession(channelId);
         session.setTicket(ticket);
     }
-
-
     public void setMethodDispatcher(MethodDispatcher methodDispatcher)
     {
         _methodDispatcher = methodDispatcher;
+    }
+
+    public void setFlowControl(final int channelId, final boolean active)
+    {
+        final AMQSession session = getSession(channelId);
+        session.setFlowControl(active);
+    }
+
+    public void methodFrameReceived(final int channel, final AMQMethodBody amqMethodBody) throws AMQException
+    {
+        _protocolHandler.methodBodyReceived(channel, amqMethodBody, _minaProtocolSession);
     }
 }

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java?rev=631938&r1=631937&r2=631938&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java Thu Feb 28 04:16:41 2008
@@ -36,7 +36,7 @@
  * The state manager is responsible for managing the state of the protocol session. <p/> For each AMQProtocolHandler
  * there is a separate state manager.
  */
-public class AMQStateManager implements AMQMethodListener
+public class AMQStateManager 
 {
     private static final Logger _logger = LoggerFactory.getLogger(AMQStateManager.class);
 
@@ -51,7 +51,7 @@
      * AMQFrame.
      */
 
-    private final CopyOnWriteArraySet _stateListeners = new CopyOnWriteArraySet();
+
     private final Object _stateLock = new Object();
     private static final long MAXIMUM_STATE_WAIT_TIME = Long.parseLong(System.getProperty("amqj.MaximumStateWait", "30000"));
 
@@ -90,19 +90,6 @@
         }
     }
 
-    public void error(Exception e)
-    {
-        _logger.debug("State manager receive error notification: " + e);
-        synchronized (_stateListeners)
-        {
-            final Iterator it = _stateListeners.iterator();
-            while (it.hasNext())
-            {
-                final StateListener l = (StateListener) it.next();
-                l.error(e);
-            }
-        }
-    }
 
     public <B extends AMQMethodBody> boolean methodReceived(AMQMethodEvent<B> evt) throws AMQException
     {

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java?rev=631938&r1=631937&r2=631938&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java Thu Feb 28 04:16:41 2008
@@ -85,7 +85,7 @@
             throw new AMQNoTransportForProtocolException(details, null, null);
         }
 
-        if (transport == _currentInstance)
+       /* if (transport == _currentInstance)
         {
             if (transport == VM)
             {
@@ -100,21 +100,23 @@
             }
         }
 
-        _currentInstance = transport;
+        _currentInstance = transport;*/
 
+        ITransportConnection instance;
         switch (transport)
         {
             case SOCKET:
-                _instance = new SocketTransportConnection(new SocketTransportConnection.SocketConnectorFactory()
-                {
-                    public IoConnector newSocketConnector()
-                    {
-                        return new ExistingSocketConnector();
-                    }
-                });
+                instance =
+                        new SocketTransportConnection(new SocketTransportConnection.SocketConnectorFactory()
+                        {
+                            public IoConnector newSocketConnector()
+                            {
+                                return new ExistingSocketConnector();
+                            }
+                        });
                 break;
             case TCP:
-                _instance = new SocketTransportConnection(new SocketTransportConnection.SocketConnectorFactory()
+                instance = new SocketTransportConnection(new SocketTransportConnection.SocketConnectorFactory()
                 {
                     public IoConnector newSocketConnector()
                     {
@@ -142,12 +144,15 @@
                 break;
             case VM:
             {
-                _instance = getVMTransport(details, Boolean.getBoolean("amqj.AutoCreateVMBroker"));
+                instance = getVMTransport(details, Boolean.getBoolean("amqj.AutoCreateVMBroker"));
                 break;
             }
+            default:
+                // FIXME: TGM
+                throw new AMQNoTransportForProtocolException(details, null, null);
         }
 
-        return _instance;
+        return instance;
     }
 
     private static int getTransport(String transport)

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/jms/FailoverPolicy.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/jms/FailoverPolicy.java?rev=631938&r1=631937&r2=631938&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/jms/FailoverPolicy.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/jms/FailoverPolicy.java Thu Feb 28 04:16:41 2008
@@ -34,7 +34,6 @@
     private static final long MINUTE = 60000L;
 
     private static final long DEFAULT_METHOD_TIMEOUT = 1 * MINUTE;
-    private static final long DEFAULT_FAILOVER_TIMEOUT = 4 * MINUTE;
 
     private FailoverMethod[] _methods = new FailoverMethod[1];
 
@@ -161,16 +160,7 @@
             }
             else
             {
-                if ((now - _lastFailTime) >= DEFAULT_FAILOVER_TIMEOUT)
-                {
-                    _logger.info("Failover timeout");
-
-                    return false;
-                }
-                else
-                {
-                    _lastMethodTime = now;
-                }
+                _lastMethodTime = now;
             }
         }
         else

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java?rev=631938&r1=631937&r2=631938&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java Thu Feb 28 04:16:41 2008
@@ -45,6 +45,7 @@
 import java.io.BufferedInputStream;
 import java.io.FileInputStream;
 import java.io.IOException;
+import java.net.URISyntaxException;
 import java.util.Hashtable;
 import java.util.Iterator;
 import java.util.Map;
@@ -216,7 +217,7 @@
         {
             binding = new AMQBindingURL(bindingURL);
         }
-        catch (URLSyntaxException urlse)
+        catch (URISyntaxException urlse)
         {
             _logger.warn("Unable to destination:" + urlse);
 

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java?rev=631938&r1=631937&r2=631938&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java Thu Feb 28 04:16:41 2008
@@ -165,6 +165,22 @@
         }
     }
 
+    public void testUnresolvedVirtualHostFailure() throws Exception
+    {
+        try
+        {
+            new AMQConnection("amqp://guest:guest@clientid/rubbishhost?brokerlist='" + _broker + "?retries='0''");
+            fail("Connection should not be established");
+        }
+        catch (AMQException amqe)
+        {
+            if (!(amqe instanceof AMQConnectionFailureException))
+            {
+                fail("Correct exception not thrown. Excpected 'AMQConnectionFailureException' got: " + amqe);
+            }
+        }
+    }
+
     public void testClientIdCannotBeChanged() throws Exception
     {
         Connection connection = new AMQConnection(_broker, "guest", "guest",

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/test/unit/close/CloseBeforeAckTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/test/unit/close/CloseBeforeAckTest.java?rev=631938&r1=631937&r2=631938&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/test/unit/close/CloseBeforeAckTest.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/test/unit/close/CloseBeforeAckTest.java Thu Feb 28 04:16:41 2008
@@ -29,8 +29,8 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import uk.co.thebadgerset.junit.concurrency.TestRunnable;
-import uk.co.thebadgerset.junit.concurrency.ThreadTestCoordinator;
+import org.apache.qpid.junit.concurrency.TestRunnable;
+import org.apache.qpid.junit.concurrency.ThreadTestCoordinator;
 
 import javax.jms.Connection;
 import javax.jms.Message;

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/common/src/main/java/org/apache/qpid/AMQConnectionFailureException.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/common/src/main/java/org/apache/qpid/AMQConnectionFailureException.java?rev=631938&r1=631937&r2=631938&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/common/src/main/java/org/apache/qpid/AMQConnectionFailureException.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/common/src/main/java/org/apache/qpid/AMQConnectionFailureException.java Thu Feb 28 04:16:41 2008
@@ -1,40 +1,47 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.qpid;
-
-/**
- * AMQConnectionFailureException indicates that a connection to a broker could not be formed.
- *
- * <p/><table id="crc"><caption>CRC Card</caption>
- * <tr><th> Responsibilities <th> Collaborations
- * <tr><td> Represents failure to connect to a broker.
- * </table>
- *
- * @todo Not an AMQP exception as no status code.
- */
-public class AMQConnectionFailureException extends AMQException
-{
-    public AMQConnectionFailureException(String message, Throwable cause)
-    {
-        super(null, message, cause);
-    }
-}
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid;
+
+import org.apache.qpid.protocol.AMQConstant;
+
+/**
+ * AMQConnectionFailureException indicates that a connection to a broker could not be formed.
+ *
+ * <p/><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * <tr><td> Represents failure to connect to a broker.
+ * </table>
+ *
+ * @todo Not an AMQP exception as no status code.
+ */
+public class AMQConnectionFailureException extends AMQException
+{
+	public AMQConnectionFailureException(String message, Throwable cause)
+	{
+		super(null, message, cause);
+	}
+
+    public AMQConnectionFailureException(AMQConstant errorCode, String message, Throwable cause)
+    {
+        super(errorCode, message, cause);
+    }
+}

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java?rev=631938&r1=631937&r2=631938&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java Thu Feb 28 04:16:41 2008
@@ -22,6 +22,7 @@
 
 import org.apache.mina.common.ByteBuffer;
 import org.apache.mina.common.IoSession;
+import org.apache.mina.common.SimpleByteBufferAllocator;
 import org.apache.mina.filter.codec.CumulativeProtocolDecoder;
 import org.apache.mina.filter.codec.ProtocolDecoderOutput;
 
@@ -48,6 +49,9 @@
  */
 public class AMQDecoder extends CumulativeProtocolDecoder
 {
+
+    private static final String BUFFER = AMQDecoder.class.getName() + ".Buffer";
+
     /** Holds the 'normal' AMQP data decoder. */
     private AMQDataBlockDecoder _dataBlockDecoder = new AMQDataBlockDecoder();
 
@@ -171,4 +175,97 @@
     {
         _expectProtocolInitiation = expectProtocolInitiation;
     }
+
+
+ /**
+     * Cumulates content of <tt>in</tt> into internal buffer and forwards
+     * decoding request to {@link #doDecode(IoSession, ByteBuffer, ProtocolDecoderOutput)}.
+     * <tt>doDecode()</tt> is invoked repeatedly until it returns <tt>false</tt>
+     * and the cumulative buffer is compacted after decoding ends.
+     *
+     * @throws IllegalStateException if your <tt>doDecode()</tt> returned
+     *                               <tt>true</tt> not consuming the cumulative buffer.
+     */
+    public void decode( IoSession session, ByteBuffer in,
+                        ProtocolDecoderOutput out ) throws Exception
+    {
+        ByteBuffer buf = ( ByteBuffer ) session.getAttribute( BUFFER );
+        // if we have a session buffer, append data to that otherwise
+        // use the buffer read from the network directly
+        if( buf != null )
+        {
+            buf.put( in );
+            buf.flip();
+        }
+        else
+        {
+            buf = in;
+        }
+
+        for( ;; )
+        {
+            int oldPos = buf.position();
+            boolean decoded = doDecode( session, buf, out );
+            if( decoded )
+            {
+                if( buf.position() == oldPos )
+                {
+                    throw new IllegalStateException(
+                            "doDecode() can't return true when buffer is not consumed." );
+                }
+
+                if( !buf.hasRemaining() )
+                {
+                    break;
+                }
+            }
+            else
+            {
+                break;
+            }
+        }
+
+        // if there is any data left that cannot be decoded, we store
+        // it in a buffer in the session and next time this decoder is
+        // invoked the session buffer gets appended to
+        if ( buf.hasRemaining() )
+        {
+            storeRemainingInSession( buf, session );
+        }
+        else
+        {
+            removeSessionBuffer( session );
+        }
+    }
+
+    /**
+     * Releases the cumulative buffer used by the specified <tt>session</tt>.
+     * Please don't forget to call <tt>super.dispose( session )</tt> when
+     * you override this method.
+     */
+    public void dispose( IoSession session ) throws Exception
+    {
+        removeSessionBuffer( session );
+    }
+
+    private void removeSessionBuffer(IoSession session)
+    {
+        ByteBuffer buf = ( ByteBuffer ) session.getAttribute( BUFFER );
+        if( buf != null )
+        {
+            buf.release();
+            session.removeAttribute( BUFFER );
+        }
+    }
+
+    private static final SimpleByteBufferAllocator SIMPLE_BYTE_BUFFER_ALLOCATOR = new SimpleByteBufferAllocator();
+
+    private void storeRemainingInSession(ByteBuffer buf, IoSession session)
+    {
+        ByteBuffer remainingBuf = SIMPLE_BYTE_BUFFER_ALLOCATOR.allocate( buf.remaining(), false );
+        remainingBuf.setAutoExpand( true );
+        remainingBuf.put( buf );
+        session.setAttribute( BUFFER, remainingBuf );
+    }
+
 }

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java?rev=631938&r1=631937&r2=631938&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java Thu Feb 28 04:16:41 2008
@@ -21,6 +21,8 @@
 package org.apache.qpid.framing;
 
 import org.apache.mina.common.ByteBuffer;
+import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
+import org.apache.qpid.AMQException;
 
 public interface AMQBody
 {
@@ -36,4 +38,6 @@
     
     //public void populateFromBuffer(ByteBuffer buffer, long size)
     //    throws AMQFrameDecodingException, AMQProtocolVersionException;        
+
+    void handle(final int channelId, final AMQVersionAwareProtocolSession amqMinaProtocolSession) throws AMQException;
 }

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java?rev=631938&r1=631937&r2=631938&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java Thu Feb 28 04:16:41 2008
@@ -27,7 +27,7 @@
     private final int _channel;
 
     private final AMQBody _bodyFrame;
-
+    public static final byte FRAME_END_BYTE = (byte) 0xCE;
 
 
     public AMQFrame(final int channel, final AMQBody bodyFrame)
@@ -47,13 +47,19 @@
         return 1 + 2 + 4 + _bodyFrame.getSize() + 1;
     }
 
+    public static final int getFrameOverhead()
+    {
+        return 1 + 2 + 4 + 1;
+    }
+
+
     public void writePayload(ByteBuffer buffer)
     {
         buffer.put(_bodyFrame.getFrameType());
         EncodingUtils.writeUnsignedShort(buffer, _channel);
         EncodingUtils.writeUnsignedInteger(buffer, _bodyFrame.getSize());
         _bodyFrame.writePayload(buffer);
-        buffer.put((byte) 0xCE);
+        buffer.put(FRAME_END_BYTE);
     }
 
     public final int getChannel()
@@ -66,10 +72,54 @@
         return _bodyFrame;
     }
 
-
-
     public String toString()
     {
         return "Frame channelId: " + _channel + ", bodyFrame: " + String.valueOf(_bodyFrame);
     }
+
+    public static void writeFrame(ByteBuffer buffer, final int channel, AMQBody body)
+    {
+        buffer.put(body.getFrameType());
+        EncodingUtils.writeUnsignedShort(buffer, channel);
+        EncodingUtils.writeUnsignedInteger(buffer, body.getSize());
+        body.writePayload(buffer);
+        buffer.put(FRAME_END_BYTE);
+
+    }
+
+    public static void writeFrames(ByteBuffer buffer, final int channel, AMQBody body1, AMQBody body2)
+    {
+        buffer.put(body1.getFrameType());
+        EncodingUtils.writeUnsignedShort(buffer, channel);
+        EncodingUtils.writeUnsignedInteger(buffer, body1.getSize());
+        body1.writePayload(buffer);
+        buffer.put(FRAME_END_BYTE);
+        buffer.put(body2.getFrameType());
+        EncodingUtils.writeUnsignedShort(buffer, channel);
+        EncodingUtils.writeUnsignedInteger(buffer, body2.getSize());
+        body2.writePayload(buffer);
+        buffer.put(FRAME_END_BYTE);
+
+    }
+
+    public static void writeFrames(ByteBuffer buffer, final int channel, AMQBody body1, AMQBody body2, AMQBody body3)
+    {
+        buffer.put(body1.getFrameType());
+        EncodingUtils.writeUnsignedShort(buffer, channel);
+        EncodingUtils.writeUnsignedInteger(buffer, body1.getSize());
+        body1.writePayload(buffer);
+        buffer.put(FRAME_END_BYTE);
+        buffer.put(body2.getFrameType());
+        EncodingUtils.writeUnsignedShort(buffer, channel);
+        EncodingUtils.writeUnsignedInteger(buffer, body2.getSize());
+        body2.writePayload(buffer);
+        buffer.put(FRAME_END_BYTE);
+        buffer.put(body3.getFrameType());
+        EncodingUtils.writeUnsignedShort(buffer, channel);
+        EncodingUtils.writeUnsignedInteger(buffer, body3.getSize());
+        body3.writePayload(buffer);
+        buffer.put(FRAME_END_BYTE);
+
+    }
+
 }

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java?rev=631938&r1=631937&r2=631938&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java Thu Feb 28 04:16:41 2008
@@ -24,7 +24,9 @@
 import org.apache.mina.common.ByteBuffer;
 import org.apache.qpid.AMQChannelException;
 import org.apache.qpid.AMQConnectionException;
+import org.apache.qpid.AMQException;
 import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
 
 public abstract class AMQMethodBodyImpl implements AMQMethodBody
 {
@@ -84,6 +86,11 @@
     public AMQConnectionException getConnectionException(AMQConstant code, String message, Throwable cause)
     {
         return new AMQConnectionException(code, message, getClazz(), getMethod(), getMajor(), getMinor(), cause);
+    }
+
+    public void handle(final int channelId, final AMQVersionAwareProtocolSession session) throws AMQException
+    {
+        session.methodFrameReceived(channelId, this);
     }
 
 }

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java?rev=631938&r1=631937&r2=631938&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java Thu Feb 28 04:16:41 2008
@@ -26,8 +26,7 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Map;
-import java.util.WeakHashMap;
+import java.util.*;
 import java.lang.ref.WeakReference;
 
 /**
@@ -38,6 +37,62 @@
  */
 public final class AMQShortString implements CharSequence, Comparable<AMQShortString>
 {
+    private static final byte MINUS = (byte)'-';
+    private static final byte ZERO = (byte) '0';
+
+
+
+    private final class TokenizerImpl implements AMQShortStringTokenizer
+    {
+        private final byte _delim;
+        private int _count = -1;
+        private int _pos = 0;
+
+        public TokenizerImpl(final byte delim)
+        {
+            _delim = delim;
+        }
+
+        public int countTokens()
+        {
+            if(_count == -1)
+            {
+                _count = 1 + AMQShortString.this.occurences(_delim);
+            }
+            return _count;
+        }
+
+        public AMQShortString nextToken()
+        {
+            if(_pos <= AMQShortString.this.length())
+            {
+                int nextDelim = AMQShortString.this.indexOf(_delim, _pos);
+                if(nextDelim == -1)
+                {
+                    nextDelim = AMQShortString.this.length();
+                }
+
+                AMQShortString nextToken = AMQShortString.this.substring(_pos, nextDelim++);
+                _pos = nextDelim;
+                return nextToken;
+            }
+            else
+            {
+                return null;
+            }
+        }
+
+        public boolean hasMoreTokens()
+        {
+            return _pos <= AMQShortString.this.length();
+        }
+    }
+
+    private AMQShortString substring(final int from, final int to)
+    {
+        return new AMQShortString(_data, from, to);
+    }
+
 
     private static final ThreadLocal<Map<AMQShortString, WeakReference<AMQShortString>>> _localInternMap =
             new ThreadLocal<Map<AMQShortString, WeakReference<AMQShortString>>>()
@@ -53,7 +108,8 @@
 
     private static final Logger _logger = LoggerFactory.getLogger(AMQShortString.class);
 
-    private final ByteBuffer _data;
+    private final byte[] _data;
+    private final int _offset;
     private int _hashCode;
     private final int _length;
     private static final char[] EMPTY_CHAR_ARRAY = new char[0];
@@ -65,17 +121,25 @@
     public AMQShortString(byte[] data)
     {
 
-        _data = ByteBuffer.wrap(data);
+        _data = data.clone();
         _length = data.length;
+        _offset = 0;
+    }
+
+    public AMQShortString(byte[] data, int pos)
+    {
+        final int size = data[pos++];
+        final byte[] dataCopy = new byte[size];
+        System.arraycopy(data,pos,dataCopy,0,size);
+        _length = size;
+        _data = dataCopy;
+        _offset = 0;
     }
 
     public AMQShortString(String data)
     {
         this((data == null) ? EMPTY_CHAR_ARRAY : data.toCharArray());
-        if (data != null)
-        {
-            _hashCode = data.hashCode();
-        }
+
     }
 
     public AMQShortString(char[] data)
@@ -87,14 +151,17 @@
 
         final int length = data.length;
         final byte[] stringBytes = new byte[length];
+        int hash = 0;
         for (int i = 0; i < length; i++)
         {
             stringBytes[i] = (byte) (0xFF & data[i]);
+            hash = (31 * hash) + stringBytes[i];
         }
+        _hashCode = hash;
+        _data = stringBytes;
 
-        _data = ByteBuffer.wrap(stringBytes);
-        _data.rewind();
         _length = length;
+        _offset = 0;
 
     }
 
@@ -110,20 +177,31 @@
 
         }
 
-        _data = ByteBuffer.wrap(stringBytes);
-        _data.rewind();
+        _data = stringBytes;
         _hashCode = hash;
         _length = length;
+        _offset = 0;
 
     }
 
-    private AMQShortString(ByteBuffer data)
+    private AMQShortString(ByteBuffer data, final int length)
     {
-        _data = data;
-        _length = data.limit();
+        byte[] dataBytes = new byte[length];
+        data.get(dataBytes);
+        _data = dataBytes;
+        _length = length;
+        _offset = 0;
 
     }
 
+    private AMQShortString(final byte[] data, final int from, final int to)
+    {
+        _offset = from;
+        _length = to - from;
+        _data = data;
+    }
+
+
     /**
      * Get the length of the short string
      * @return length of the underlying byte array
@@ -136,7 +214,7 @@
     public char charAt(int index)
     {
 
-        return (char) _data.get(index);
+        return (char) _data[_offset + index];
 
     }
 
@@ -148,27 +226,24 @@
     public int writeToByteArray(byte[] encoding, int pos)
     {
         final int size = length();
-        encoding[pos++] = (byte) length();
-        for (int i = 0; i < size; i++)
-        {
-            encoding[pos++] = _data.get(i);
-        }
-
-        return pos;
+        encoding[pos++] = (byte) size;
+        System.arraycopy(_data,_offset,encoding,pos,size);
+        return pos+size;
     }
 
     public static AMQShortString readFromByteArray(byte[] byteEncodedDestination, int pos)
     {
 
-        final byte len = byteEncodedDestination[pos];
-        if (len == 0)
+
+        final AMQShortString shortString = new AMQShortString(byteEncodedDestination, pos);
+        if(shortString.length() == 0)
         {
             return null;
         }
-
-        ByteBuffer data = ByteBuffer.wrap(byteEncodedDestination, pos + 1, len).slice();
-
-        return new AMQShortString(data);
+        else
+        {
+            return shortString;
+        }
     }
 
     public static AMQShortString readFromBuffer(ByteBuffer buffer)
@@ -180,89 +255,58 @@
         }
         else
         {
-            ByteBuffer data = buffer.slice();
-            data.limit(length);
-            data.rewind();
-            buffer.skip(length);
 
-            return new AMQShortString(data);
+            return new AMQShortString(buffer, length);
         }
     }
 
     public byte[] getBytes()
     {
-
-        if (_data.buf().hasArray() && (_data.arrayOffset() == 0))
+        if(_offset == 0 && _length == _data.length)
         {
-            return _data.array();
+            return _data.clone();
         }
         else
         {
-            final int size = length();
-            byte[] b = new byte[size];
-            ByteBuffer buf = _data.duplicate();
-            buf.rewind();
-            buf.get(b);
-
-            return b;
+            byte[] data = new byte[_length];
+            System.arraycopy(_data,_offset,data,0,_length);
+            return data;
         }
-
     }
 
     public void writeToBuffer(ByteBuffer buffer)
     {
 
         final int size = length();
-        if (size != 0)
-        {
-
-            buffer.put((byte) size);
-            if (_data.buf().hasArray())
-            {
-                buffer.put(_data.array(), _data.arrayOffset(), length());
-            }
-            else
-            {
-
-                for (int i = 0; i < size; i++)
-                {
-
-                    buffer.put(_data.get(i));
-                }
-            }
-        }
-        else
-        {
-            // really writing out unsigned byte
-            buffer.put((byte) 0);
-        }
-
+        //buffer.setAutoExpand(true);
+        buffer.put((byte) size);
+        buffer.put(_data, _offset, size);
     }
 
     private final class CharSubSequence implements CharSequence
     {
-        private final int _offset;
+        private final int _sequenceOffset;
         private final int _end;
 
         public CharSubSequence(final int offset, final int end)
         {
-            _offset = offset;
+            _sequenceOffset = offset;
             _end = end;
         }
 
         public int length()
         {
-            return _end - _offset;
+            return _end - _sequenceOffset;
         }
 
         public char charAt(int index)
         {
-            return AMQShortString.this.charAt(index + _offset);
+            return AMQShortString.this.charAt(index + _sequenceOffset);
         }
 
         public CharSequence subSequence(int start, int end)
         {
-            return new CharSubSequence(start + _offset, end + _offset);
+            return new CharSubSequence(start + _sequenceOffset, end + _sequenceOffset);
         }
     }
 
@@ -273,12 +317,11 @@
             final int size = length();
             chars = new char[size];
 
-            for (int i = 0; i < size; i++)
-            {
-                chars[i] = (char) _data.get(i);
+	    for (int i = 0; i < size; i++)
+	    {
+                chars[i] = (char) _data[i + _offset];
             }
-        }
-
+	}
         return chars;
     }
 
@@ -294,6 +337,17 @@
 
     public boolean equals(Object o)
     {
+
+
+        if(o instanceof AMQShortString)
+        {
+            return equals((AMQShortString)o);
+        }
+        if(o instanceof CharSequence)
+        {
+            return equals((CharSequence)o);
+        }
+
         if (o == null)
         {
             return false;
@@ -304,26 +358,40 @@
             return true;
         }
 
-        if (o instanceof AMQShortString)
-        {
 
-            final AMQShortString otherString = (AMQShortString) o;
+        return false;
 
-            if ((_hashCode != 0) && (otherString._hashCode != 0) && (_hashCode != otherString._hashCode))
-            {
-                return false;
-            }
+    }
 
-            return _data.equals(otherString._data);
+    public boolean equals(final AMQShortString otherString)
+    {
+        if (otherString == this)
+        {
+            return true;
+        }
+
+        if (otherString == null)
+        {
+            return false;
+        }
 
+        if ((_hashCode != 0) && (otherString._hashCode != 0) && (_hashCode != otherString._hashCode))
+        {
+            return false;
         }
 
-        return (o instanceof CharSequence) && equals((CharSequence) o);
+        return (_offset == 0 && otherString._offset == 0 && _length == _data.length && otherString._length == otherString._data.length && Arrays.equals(_data,otherString._data))
+                || Arrays.equals(getBytes(),otherString.getBytes());
 
     }
 
     public boolean equals(CharSequence s)
     {
+        if(s instanceof AMQShortString)
+        {
+            return equals((AMQShortString)s);
+        }
+
         if (s == null)
         {
             return false;
@@ -354,7 +422,7 @@
 
             for (int i = 0; i < size; i++)
             {
-                hash = (31 * hash) + _data.get(i);
+                hash = (31 * hash) + _data[i+_offset];
             }
 
             _hashCode = hash;
@@ -389,8 +457,8 @@
 
             for (int i = 0; i < length(); i++)
             {
-                final byte d = _data.get(i);
-                final byte n = name._data.get(i);
+                final byte d = _data[i+_offset];
+                final byte n = name._data[i+name._offset];
                 if (d < n)
                 {
                     return -1;
@@ -406,6 +474,12 @@
         }
     }
 
+    public AMQShortStringTokenizer tokenize(byte delim)
+    {
+        return new TokenizerImpl(delim);
+    }
+
+
     public AMQShortString intern()
     {
 
@@ -443,4 +517,111 @@
         return internString;
 
     }
+
+    private int occurences(final byte delim)
+    {
+        int count = 0;
+        final int end = _offset + _length;
+        for(int i = _offset ; i < end ; i++ )
+        {
+            if(_data[i] == delim)
+            {
+                count++;
+            }
+        }
+        return count;
+    }
+
+    private int indexOf(final byte val, final int pos)
+    {
+
+        for(int i = pos; i < length(); i++)
+        {
+            if(_data[_offset+i] == val)
+            {
+                return i;
+            }
+        }
+        return -1;
+    }
+
+
+    public static AMQShortString join(final Collection<AMQShortString> terms,
+                                       final AMQShortString delim)
+    {
+        if(terms.size() == 0)
+        {
+            return EMPTY_STRING;
+        }
+
+        int size = delim.length() * (terms.size() - 1);
+        for(AMQShortString term : terms)
+        {
+            size += term.length();
+        }
+
+        byte[] data = new byte[size];
+        int pos = 0;
+        final byte[] delimData = delim._data;
+        final int delimOffset = delim._offset;
+        final int delimLength = delim._length;
+
+
+        for(AMQShortString term : terms)
+        {
+
+            if(pos!=0)
+            {
+                System.arraycopy(delimData, delimOffset,data,pos, delimLength);
+                pos+=delimLength;
+            }
+            System.arraycopy(term._data,term._offset,data,pos,term._length);
+            pos+=term._length;
+        }
+
+
+
+        return new AMQShortString(data,0,size);  
+    }
+
+    public int toIntValue()
+    {
+        int pos = 0;
+        int val = 0;
+
+
+        boolean isNegative = (_data[pos] == MINUS);
+        if(isNegative)
+        {
+            pos++;
+        }
+        while(pos < _length)
+        {
+            int digit = (int) (_data[pos++] - ZERO);
+            if((digit < 0) || (digit > 9))
+            {
+                throw new NumberFormatException("\""+toString()+"\" is not a valid number");
+            }
+            val = val * 10;
+            val += digit;
+        }
+        if(isNegative)
+        {
+            val = val * -1;
+        }
+        return val;
+    }
+
+    public boolean contains(final byte b)
+    {
+        for(int i = 0; i < _length; i++)
+        {
+            if(_data[i] == b)
+            {
+                return true;
+            }
+        }
+        return false;  //To change body of created methods use File | Settings | File Templates.
+    }
+
 }

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/common/src/main/java/org/apache/qpid/framing/CompositeAMQDataBlock.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/common/src/main/java/org/apache/qpid/framing/CompositeAMQDataBlock.java?rev=631938&r1=631937&r2=631938&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/common/src/main/java/org/apache/qpid/framing/CompositeAMQDataBlock.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/common/src/main/java/org/apache/qpid/framing/CompositeAMQDataBlock.java Thu Feb 28 04:16:41 2008
@@ -24,7 +24,6 @@
 
 public class CompositeAMQDataBlock extends AMQDataBlock implements EncodableAMQDataBlock
 {
-    private AMQDataBlock _firstFrame;
 
     private AMQDataBlock[] _blocks;
 
@@ -33,27 +32,12 @@
         _blocks = blocks;
     }
 
-    /**
-     * The encoded block will be logically first before the AMQDataBlocks which are encoded
-     * into the buffer afterwards.
-     * @param encodedBlock already-encoded data
-     * @param blocks some blocks to be encoded.
-     */
-    public CompositeAMQDataBlock(AMQDataBlock encodedBlock, AMQDataBlock[] blocks)
-    {
-        this(blocks);
-        _firstFrame = encodedBlock;
-    }
 
     public AMQDataBlock[] getBlocks()
     {
         return _blocks;
     }
 
-    public AMQDataBlock getFirstFrame()
-    {
-        return _firstFrame;
-    }
 
     public long getSize()
     {
@@ -62,19 +46,11 @@
         {
             frameSize += _blocks[i].getSize();
         }
-        if (_firstFrame != null)
-        {
-            frameSize += _firstFrame.getSize();
-        }
         return frameSize;
     }
 
     public void writePayload(ByteBuffer buffer)
     {
-        if (_firstFrame != null)
-        {
-            _firstFrame.writePayload(buffer);
-        }
         for (int i = 0; i < _blocks.length; i++)
         {
             _blocks[i].writePayload(buffer);
@@ -90,7 +66,7 @@
         else
         {
             StringBuilder buf = new StringBuilder(this.getClass().getName());
-            buf.append("{encodedBlock=").append(_firstFrame);
+            buf.append("{");
             for (int i = 0 ; i < _blocks.length; i++)
             {
                 buf.append(" ").append(i).append("=[").append(_blocks[i].toString()).append("]");

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java?rev=631938&r1=631937&r2=631938&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java Thu Feb 28 04:16:41 2008
@@ -21,6 +21,8 @@
 package org.apache.qpid.framing;
 
 import org.apache.mina.common.ByteBuffer;
+import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
+import org.apache.qpid.AMQException;
 
 public class ContentBody implements AMQBody
 {
@@ -66,6 +68,12 @@
             ByteBuffer copy = payload.duplicate();
             buffer.put(copy.rewind());
         }
+    }
+
+    public void handle(final int channelId, final AMQVersionAwareProtocolSession session)
+            throws AMQException
+    {
+        session.contentBodyReceived(channelId, this);
     }
 
     protected void populateFromBuffer(ByteBuffer buffer, long size) throws AMQFrameDecodingException

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java?rev=631938&r1=631937&r2=631938&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java Thu Feb 28 04:16:41 2008
@@ -21,6 +21,8 @@
 package org.apache.qpid.framing;
 
 import org.apache.mina.common.ByteBuffer;
+import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
+import org.apache.qpid.AMQException;
 
 public class ContentHeaderBody implements AMQBody
 {
@@ -108,6 +110,12 @@
         buffer.putLong(bodySize);
         EncodingUtils.writeUnsignedShort(buffer, properties.getPropertyFlags());
         properties.writePropertyListPayload(buffer);
+    }
+
+    public void handle(final int channelId, final AMQVersionAwareProtocolSession session)
+            throws AMQException
+    {
+        session.contentHeaderReceived(channelId, this);
     }
 
     public static AMQFrame createAMQFrame(int channelId, int classId, int weight, BasicContentHeaderProperties properties,

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java?rev=631938&r1=631937&r2=631938&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java Thu Feb 28 04:16:41 2008
@@ -21,6 +21,8 @@
 package org.apache.qpid.framing;
 
 import org.apache.mina.common.ByteBuffer;
+import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
+import org.apache.qpid.AMQException;
 
 public class HeartbeatBody implements AMQBody
 {
@@ -53,6 +55,12 @@
 
     public void writePayload(ByteBuffer buffer)
     {
+    }
+
+    public void handle(final int channelId, final AMQVersionAwareProtocolSession session)
+            throws AMQException
+    {
+        session.heartbeatBodyReceived(channelId, this);
     }
 
     protected void populateFromBuffer(ByteBuffer buffer, long size) throws AMQFrameDecodingException

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/common/src/main/java/org/apache/qpid/framing/amqp_0_9/MethodConverter_0_9.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/common/src/main/java/org/apache/qpid/framing/amqp_0_9/MethodConverter_0_9.java?rev=631938&r1=631937&r2=631938&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/common/src/main/java/org/apache/qpid/framing/amqp_0_9/MethodConverter_0_9.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/common/src/main/java/org/apache/qpid/framing/amqp_0_9/MethodConverter_0_9.java Thu Feb 28 04:16:41 2008
@@ -70,7 +70,7 @@
         return new MethodConverter_0_9.MessagePublishInfoImpl(exchange,
                                           publishBody.getImmediate(),
                                           publishBody.getMandatory(),
-                                          routingKey == null ? null : routingKey.intern());
+                                          routingKey);
 
     }