You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kp...@apache.org on 2007/02/14 21:02:22 UTC

svn commit: r507672 [10/16] - in /incubator/qpid/branches/qpid.0-9: gentools/src/org/apache/qpid/gentools/ gentools/templ.java/ gentools/templ.net/ java/ java/broker/ java/broker/bin/ java/broker/distribution/ java/broker/distribution/src/ java/broker/...

Modified: incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/MessageRejectMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/MessageRejectMethodHandler.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/MessageRejectMethodHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/MessageRejectMethodHandler.java Wed Feb 14 12:02:03 2007
@@ -21,14 +21,18 @@
 package org.apache.qpid.client.handler;
 
 import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.MessageRejectBody;
-import org.apache.qpid.protocol.AMQMethodEvent;
 import org.apache.qpid.client.protocol.AMQProtocolSession;
 import org.apache.qpid.client.state.AMQStateManager;
 import org.apache.qpid.client.state.StateAwareMethodListener;
+import org.apache.qpid.framing.MessageRejectBody;
+import org.apache.qpid.protocol.AMQMethodEvent;
+
+//import org.apache.log4j.Logger;
 
 public class MessageRejectMethodHandler implements StateAwareMethodListener
 {
+    //private static final Logger _logger = Logger.getLogger(MessageRejectMethodHandler.class);
+
     private static MessageRejectMethodHandler _instance = new MessageRejectMethodHandler();
 
     public static MessageRejectMethodHandler getInstance()
@@ -37,12 +41,8 @@
     }
 
     private MessageRejectMethodHandler() {}
-    
-    
-    public void methodReceived (AMQStateManager stateManager,
-                                AMQProtocolSession protocolSession,
-                               	AMQMethodEvent evt)
-                                throws AMQException
+
+    public void methodReceived (AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt) throws AMQException
     {
 		// TODO
     }

Modified: incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/MessageResumeMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/MessageResumeMethodHandler.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/MessageResumeMethodHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/MessageResumeMethodHandler.java Wed Feb 14 12:02:03 2007
@@ -21,14 +21,18 @@
 package org.apache.qpid.client.handler;
 
 import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.MessageResumeBody;
-import org.apache.qpid.protocol.AMQMethodEvent;
 import org.apache.qpid.client.protocol.AMQProtocolSession;
 import org.apache.qpid.client.state.AMQStateManager;
 import org.apache.qpid.client.state.StateAwareMethodListener;
+import org.apache.qpid.framing.MessageResumeBody;
+import org.apache.qpid.protocol.AMQMethodEvent;
+
+//import org.apache.log4j.Logger;
 
 public class MessageResumeMethodHandler implements StateAwareMethodListener
 {
+    //private static final Logger _logger = Logger.getLogger(MessageResumeMethodHandler.class);
+
     private static MessageResumeMethodHandler _instance = new MessageResumeMethodHandler();
 
     public static MessageResumeMethodHandler getInstance()
@@ -37,14 +41,10 @@
     }
 
     private MessageResumeMethodHandler() {}
-    
-    
-    public void methodReceived (AMQStateManager stateManager,
-                                AMQProtocolSession protocolSession,
-                               	AMQMethodEvent evt)
-                                throws AMQException
+
+    public void methodReceived (AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt) throws AMQException
     {
-		System.out.println("");
+		// TODO
     }
 }
 

Modified: incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/MessageTransferMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/MessageTransferMethodHandler.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/MessageTransferMethodHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/MessageTransferMethodHandler.java Wed Feb 14 12:02:03 2007
@@ -20,7 +20,6 @@
  */
 package org.apache.qpid.client.handler;
 
-import org.apache.log4j.Logger;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.client.message.MessageHeaders;
 import org.apache.qpid.client.message.UnprocessedMessage;
@@ -31,10 +30,13 @@
 import org.apache.qpid.framing.MessageTransferBody;
 import org.apache.qpid.protocol.AMQMethodEvent;
 
+import org.apache.log4j.Logger;
+
 public class MessageTransferMethodHandler implements StateAwareMethodListener
 {
-    private static MessageTransferMethodHandler _instance = new MessageTransferMethodHandler();
     private static final Logger _logger = Logger.getLogger(MessageTransferMethodHandler.class);
+
+    private static MessageTransferMethodHandler _instance = new MessageTransferMethodHandler();
     
     public static MessageTransferMethodHandler getInstance()
     {
@@ -42,18 +44,11 @@
     }
 
     private MessageTransferMethodHandler() {}
-    
-    
-    public void methodReceived (AMQStateManager stateManager,
-                                AMQProtocolSession protocolSession,
-                               	AMQMethodEvent evt)
-                                throws AMQException
+
+    public void methodReceived (AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt) throws AMQException
     {
-    	final UnprocessedMessage msg = new UnprocessedMessage();
     	MessageTransferBody transferBody = (MessageTransferBody) evt.getMethod();
         
-        msg.channelId = evt.getChannelId();
-        msg.deliveryTag = evt.getRequestId();
         _logger.debug("New JmsDeliver method received");
         
         MessageHeaders messageHeaders = new MessageHeaders();
@@ -73,7 +68,7 @@
         messageHeaders.setDeliveryMode(transferBody.getDeliveryMode());
         messageHeaders.setJMSHeaders(transferBody.getApplicationHeaders());
         
-        msg.contentHeader = messageHeaders;
+    	final UnprocessedMessage msg = new UnprocessedMessage(evt.getChannelId(), evt.getRequestId(), messageHeaders);
         
         if(transferBody.getBody().getContentType() == Content.TypeEnum.INLINE_T)
         {

Modified: incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/QueueDeleteOkMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/QueueDeleteOkMethodHandler.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/QueueDeleteOkMethodHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/handler/QueueDeleteOkMethodHandler.java Wed Feb 14 12:02:03 2007
@@ -17,38 +17,35 @@
  */
 package org.apache.qpid.client.handler;
 
+import org.apache.qpid.AMQException;
 import org.apache.qpid.client.protocol.AMQProtocolSession;
-import org.apache.qpid.client.state.StateAwareMethodListener;
 import org.apache.qpid.client.state.AMQStateManager;
-import org.apache.qpid.AMQException;
+import org.apache.qpid.client.state.StateAwareMethodListener;
 import org.apache.qpid.framing.QueueDeleteOkBody;
 import org.apache.qpid.protocol.AMQMethodEvent;
+
 import org.apache.log4j.Logger;
 
-/**
- * @author Apache Software Foundation
- */
 public class QueueDeleteOkMethodHandler implements StateAwareMethodListener
 {
-     private static final Logger _logger = Logger.getLogger(QueueDeleteOkMethodHandler.class);
-     private static final QueueDeleteOkMethodHandler _instance = new QueueDeleteOkMethodHandler();
+    private static final Logger _logger = Logger.getLogger(QueueDeleteOkMethodHandler.class);
+
+    private static final QueueDeleteOkMethodHandler _instance = new QueueDeleteOkMethodHandler();
+
+    public static QueueDeleteOkMethodHandler getInstance()
+    {
+        return _instance;
+    }
+
+    private QueueDeleteOkMethodHandler() {}
 
-     public static QueueDeleteOkMethodHandler getInstance()
-     {
-         return _instance;
-     }
-
-     private QueueDeleteOkMethodHandler()
-     {
-     }
-
-     public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt) throws AMQException
-     {
-         if (_logger.isDebugEnabled())
-         {
+    public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt) throws AMQException
+    {
+        if (_logger.isDebugEnabled())
+        {
             QueueDeleteOkBody body = (QueueDeleteOkBody) evt.getMethod();
             _logger.debug("Received Queue.Delete-Ok message, message count: " + body.messageCount);
-         }
-     }
+        }
+    }
 }
 

Modified: incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesMessage.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesMessage.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesMessage.java Wed Feb 14 12:02:03 2007
@@ -22,6 +22,7 @@
 
 import java.io.IOException;
 import java.nio.charset.Charset;
+import org.apache.qpid.framing.AMQShortString;
 
 import javax.jms.JMSException;
 import javax.jms.MessageEOFException;
@@ -54,7 +55,7 @@
     AbstractBytesMessage(ByteBuffer data)
     {
         super(data); // this instanties a content header
-        getMessageHeaders().setContentType(getMimeType());
+        getMessageHeaders().setContentType(getMimeTypeAsShortString());
 
         if (_data == null)
         {
@@ -68,13 +69,12 @@
         _data.setAutoExpand(true);
     }
 
-    
     AbstractBytesMessage(long messageNbr, MessageHeaders contentHeader, ByteBuffer data)
             throws AMQException
     {
         // TODO: this casting is ugly. Need to review whole ContentHeaderBody idea
         super(messageNbr, contentHeader, data);
-        getMessageHeaders().setContentType(getMimeType());
+        getMessageHeaders().setContentType(getMimeTypeAsShortString());
     }
 
     public void clearBodyImpl() throws JMSException

Modified: incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesTypedMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesTypedMessage.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesTypedMessage.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesTypedMessage.java Wed Feb 14 12:02:03 2007
@@ -10,6 +10,7 @@
 import javax.jms.MessageNotWriteableException;
 
 import org.apache.mina.common.ByteBuffer;
+import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.AMQException;
 
 /**

Modified: incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java Wed Feb 14 12:02:03 2007
@@ -34,15 +34,22 @@
 import org.apache.qpid.AMQException;
 import org.apache.qpid.client.AMQDestination;
 import org.apache.qpid.client.BasicMessageConsumer;
-import org.apache.qpid.client.CustomJMXProperty;
+import org.apache.qpid.client.CustomJMSXProperty;
+import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.url.AMQBindingURL;
 import org.apache.qpid.url.BindingURL;
 import org.apache.qpid.url.URLSyntaxException;
 
+import javax.jms.*;
+import java.util.Collections;
+import java.util.Enumeration;
+import java.util.Map;
+
 public abstract class AbstractJMSMessage extends AMQMessage implements org.apache.qpid.jms.Message
 {
     private static final Map _destinationCache = Collections.synchronizedMap(new ReferenceMap());
+                                                                  
 
     protected boolean _redelivered;
 
@@ -51,6 +58,7 @@
     protected boolean _readableMessage = false;
     protected boolean _changedData;
     private Destination _destination;
+    private JMSHeaderAdapter _headerAdapter;
     private BasicMessageConsumer _consumer;
 
     protected AbstractJMSMessage(ByteBuffer data)
@@ -84,17 +92,29 @@
         _readableProperties = (_messageHeaders != null);
     }
 
-    public String getJMSMessageID() throws JMSException
+    public AMQShortString getJMSMessageIDAsShortString() throws JMSException
     {
         if (getMessageHeaders().getMessageId() == null)
         {
-            getMessageHeaders().setMessageId("ID:" + _deliveryTag);
+            getMessageHeaders().setMessageId(new AMQShortString("ID:" + _deliveryTag));
         }
         return getMessageHeaders().getMessageId();
     }
+    
+    // The String version is required for javax.jms.Message
+    public String getJMSMessageID() throws JMSException
+    {
+        return getJMSMessageIDAsShortString().asString();
+    }
 
+    // The String version is required for javax.jms.Message
     public void setJMSMessageID(String messageId) throws JMSException
     {
+        setJMSMessageID(new AMQShortString(messageId));
+    }
+    
+    public void setJMSMessageID(AMQShortString messageId) throws JMSException
+    {
         getMessageHeaders().setMessageId(messageId);
     }
 
@@ -115,22 +135,35 @@
 
     public void setJMSCorrelationIDAsBytes(byte[] bytes) throws JMSException
     {
-        getMessageHeaders().setCorrelationId(new String(bytes));
+        getMessageHeaders().setCorrelationId(new AMQShortString(bytes));
     }
 
+    // The String version is required for javax.jms.Message
     public void setJMSCorrelationID(String correlationId) throws JMSException
     {
+        setJMSCorrelationID(new AMQShortString(correlationId));
+    }
+    
+    public void setJMSCorrelationID(AMQShortString correlationId) throws JMSException
+    {
         getMessageHeaders().setCorrelationId(correlationId);
     }
 
-    public String getJMSCorrelationID() throws JMSException
+    public AMQShortString getJMSCorrelationIDAsShortString() throws JMSException
     {
         return getMessageHeaders().getCorrelationId();
     }
 
+    // The String version is required for javax.jms.Message
+    public String getJMSCorrelationID() throws JMSException
+    {
+        AMQShortString ss = getMessageHeaders().getCorrelationId();
+        return ss == null ? null : ss.asString();
+    }
+
     public Destination getJMSReplyTo() throws JMSException
     {
-        String replyToEncoding = getMessageHeaders().getReplyTo();
+        AMQShortString replyToEncoding = getMessageHeaders().getReplyTo();
         if (replyToEncoding == null)
         {
             return null;
@@ -142,7 +175,7 @@
             {
                 try
                 {
-                    BindingURL binding = new AMQBindingURL(replyToEncoding);
+                    BindingURL binding = new AMQBindingURL(replyToEncoding.asString());
                     dest = AMQDestination.createDestination(binding);
                 }
                 catch (URLSyntaxException e)
@@ -165,11 +198,11 @@
         if (!(destination instanceof AMQDestination))
         {
             throw new IllegalArgumentException("ReplyTo destination may only be an AMQDestination - passed argument was type " +
-                                               destination.getClass());
+                    destination.getClass());
         }
         final AMQDestination amqd = (AMQDestination) destination;
 
-        final String encodedDestination = amqd.getEncodedName();
+        final AMQShortString encodedDestination = amqd.getEncodedName();
         _destinationCache.put(encodedDestination, destination);
         getMessageHeaders().setReplyTo(encodedDestination);
     }
@@ -179,7 +212,7 @@
         return _destination;
     }
 
-    public void setJMSDestination(Destination destination) throws JMSException
+    public void setJMSDestination(Destination destination)
     {
         _destination = destination;
     }
@@ -209,7 +242,7 @@
         // Since the type field is not a part of message.transport and is used only for
         // JMS messages, this change to JMS Headers solves the problem.
         // return getMessageHeaders().getType();
-        return getStringProperty(CustomJMXProperty.JMSXType.toString());
+        return getStringProperty(CustomJMSXProperty.JMSXType.toString());
     }
 
     public void setJMSType(String string) throws JMSException
@@ -217,7 +250,7 @@
         // Since the type field is not a part of message.transport and is used only for
         // JMS messages, this change to JMS Headers solves the problem.
         // getMessageHeaders().setType(string);
-        setStringProperty(CustomJMXProperty.JMSXType.toString(), string);
+        setStringProperty(CustomJMSXProperty.JMSXType.toString(), string);
     }
 
     public long getJMSExpiration() throws JMSException
@@ -243,7 +276,6 @@
     public void clearProperties() throws JMSException
     {
         getMessageHeaders().getJMSHeaders().clear();
-
         _readableProperties = false;
     }
 
@@ -253,11 +285,24 @@
         _readableMessage = false;
     }
 
+    public boolean propertyExists(AMQShortString propertyName) throws JMSException
+    {
+        checkPropertyName(propertyName);
+        return getMessageHeaders().getJMSHeaders().propertyExists(propertyName);
+    }
 
     public boolean propertyExists(String propertyName) throws JMSException
     {
+        return propertyExists(new AMQShortString(propertyName));
+    }
+
+    public boolean getBooleanProperty(AMQShortString propertyName) throws JMSException
+    {
         checkPropertyName(propertyName);
-        return getMessageHeaders().getJMSHeaders().propertyExists(propertyName);
+        Boolean b = getMessageHeaders().getJMSHeaders().getBoolean(propertyName);
+        if (b != null)
+            return b;
+        return false;
     }
 
     public boolean getBooleanProperty(String propertyName) throws JMSException
@@ -267,7 +312,6 @@
         if (b != null)
             return b;
         return false;
-//        return getMessageHeaders().getJMSHeaders().getBoolean(propertyName);
     }
 
     public byte getByteProperty(String propertyName) throws JMSException
@@ -277,7 +321,13 @@
         if (b == null)
             throw new NumberFormatException("Byte value null");
         return b;
-//        return getMessageHeaders().getJMSHeaders().getByte(propertyName);
+    }
+
+    public byte[] getBytesProperty(AMQShortString propertyName) throws JMSException
+    {
+        checkPropertyName(propertyName);
+        return getMessageHeaders().getJMSHeaders().getBytes(propertyName);
+
     }
 
     public short getShortProperty(String propertyName) throws JMSException
@@ -291,7 +341,6 @@
         if (b == null)
             throw new NumberFormatException("Short value null");
         return (short)b;
-//        return getMessageHeaders().getJMSHeaders().getShort(propertyName);
     }
 
     public int getIntProperty(String propertyName) throws JMSException
@@ -309,7 +358,6 @@
         if (b == null)
             throw new NumberFormatException("Int value null");
         return (short)b;
-//        return getMessageHeaders().getJMSHeaders().getInteger(propertyName);
     }
 
     public long getLongProperty(String propertyName) throws JMSException
@@ -318,6 +366,7 @@
         Long l = getMessageHeaders().getJMSHeaders().getLong(propertyName);
         if (l != null)
             return l;
+        // try Integer
         Integer i = getMessageHeaders().getJMSHeaders().getInteger(propertyName);
         if (i != null)
             return i;
@@ -330,7 +379,6 @@
         if (b == null)
             throw new NumberFormatException("Long value null");
         return (short)b;
-//        return getMessageHeaders().getJMSHeaders().getLong(propertyName);
     }
 
     public float getFloatProperty(String propertyName) throws JMSException
@@ -374,13 +422,18 @@
         return getMessageHeaders().getJMSHeaders().getPropertyNames();
     }
 
-    public void setBooleanProperty(String propertyName, boolean b) throws JMSException
+    public void setBooleanProperty(AMQShortString propertyName, boolean b) throws JMSException
     {
         checkWritableProperties();
         checkPropertyName(propertyName);
         getMessageHeaders().getJMSHeaders().setBoolean(propertyName, b);
     }
 
+    public void setBooleanProperty(String propertyName, boolean b) throws JMSException
+    {
+        setBooleanProperty(new AMQShortString(propertyName), b);
+    }
+
     public void setByteProperty(String propertyName, byte b) throws JMSException
     {
         checkWritableProperties();
@@ -437,7 +490,7 @@
         getMessageHeaders().getJMSHeaders().setObject(propertyName, object);
     }
 
-    protected void removeProperty(String propertyName) throws JMSException
+    protected void removeProperty(AMQShortString propertyName) throws JMSException
     {
         checkPropertyName(propertyName);
         getMessageHeaders().getJMSHeaders().remove(propertyName);
@@ -468,7 +521,7 @@
 
     public void acknowledge() throws JMSException
     {
-        if(_session != null)
+        if (_session != null)
         {
             _session.acknowledge();
         }
@@ -488,7 +541,12 @@
      */
     public abstract String toBodyString() throws JMSException;
 
-    public abstract String getMimeType();
+    public String getMimeType()
+    {
+        return getMimeTypeAsShortString().toString();
+    }
+
+    public abstract AMQShortString getMimeTypeAsShortString();
 
     public String toString()
     {
@@ -496,6 +554,7 @@
         {
             StringBuffer buf = new StringBuffer("Body:\n");
             buf.append(toBodyString());
+            buf.append("\nJMS Correlation ID: ").append(getJMSCorrelationID());
             buf.append("\nJMS timestamp: ").append(getJMSTimestamp());
             buf.append("\nJMS expiration: ").append(getJMSExpiration());
             buf.append("\nJMS priority: ").append(getJMSPriority());
@@ -525,13 +584,13 @@
         getMessageHeaders().setJMSHeaders(messageProperties);
     }
 
-    private void checkPropertyName(String propertyName)
+    private void checkPropertyName(CharSequence propertyName)
     {
         if (propertyName == null)
         {
             throw new IllegalArgumentException("Property name must not be null");
         }
-        else if ("".equals(propertyName))
+        else if (propertyName.length() == 0)
         {
             throw new IllegalArgumentException("Property name must not be the empty string");
         }

Modified: incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java Wed Feb 14 12:02:03 2007
@@ -20,6 +20,9 @@
  */
 package org.apache.qpid.client.message;
 
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQShortString;
+
 import java.util.Iterator;
 import java.util.List;
 
@@ -27,7 +30,6 @@
 
 import org.apache.log4j.Logger;
 import org.apache.mina.common.ByteBuffer;
-import org.apache.qpid.AMQException;
 
 public abstract class AbstractJMSMessageFactory implements MessageFactory
 {
@@ -61,5 +63,4 @@
         msg.setJMSRedelivered(redelivered);
         return msg;
     }
-
 }

Modified: incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java Wed Feb 14 12:02:03 2007
@@ -25,6 +25,7 @@
 import java.nio.charset.Charset;
 import java.nio.charset.CharsetDecoder;
 import java.nio.charset.CharsetEncoder;
+import org.apache.qpid.framing.AMQShortString;
 
 import javax.jms.BytesMessage;
 import javax.jms.JMSException;
@@ -35,9 +36,11 @@
 
 public class JMSBytesMessage extends AbstractBytesMessage implements BytesMessage
 {
-    private static final String MIME_TYPE = "application/octet-stream";
+    public static final String MIME_TYPE = "application/octet-stream";
+    private static final AMQShortString MIME_TYPE_SHORT_STRING = new AMQShortString(MIME_TYPE);
 
-    JMSBytesMessage()
+
+    public JMSBytesMessage()
     {
         this(null);
     }
@@ -65,9 +68,9 @@
         _readableMessage = true;
     }
 
-    public String getMimeType()
+    public AMQShortString getMimeTypeAsShortString()
     {
-        return MIME_TYPE;
+        return MIME_TYPE_SHORT_STRING;
     }
 
     public long getBodyLength() throws JMSException

Modified: incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessageFactory.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessageFactory.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessageFactory.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessageFactory.java Wed Feb 14 12:02:03 2007
@@ -24,6 +24,7 @@
 
 import org.apache.mina.common.ByteBuffer;
 import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQShortString;
 
 public class JMSBytesMessageFactory extends AbstractJMSMessageFactory
 {

Modified: incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java Wed Feb 14 12:02:03 2007
@@ -25,13 +25,14 @@
 import java.util.Enumeration;
 import java.util.HashMap;
 import java.util.Map;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.AMQException;
 
 import javax.jms.JMSException;
 import javax.jms.MessageFormatException;
 
 import org.apache.log4j.Logger;
 import org.apache.mina.common.ByteBuffer;
-import org.apache.qpid.AMQException;
 
 public class JMSMapMessage extends AbstractBytesTypedMessage implements javax.jms.MapMessage
 {
@@ -39,10 +40,11 @@
 
 
     public static final String MIME_TYPE = "jms/map-message";
+    private static final AMQShortString MIME_TYPE_SHORT_STRING = new AMQShortString(MIME_TYPE);
 
     private Map<String,Object> _map = new HashMap<String, Object>();
 
-    JMSMapMessage() throws JMSException
+    public JMSMapMessage() throws JMSException
     {
         this(null);
     }
@@ -67,15 +69,14 @@
 		}
 	}	
 
-    
     public String toBodyString() throws JMSException
     {
         return _map.toString();
     }
 
-    public String getMimeType()
+    public AMQShortString getMimeTypeAsShortString()
     {
-        return MIME_TYPE;
+        return MIME_TYPE_SHORT_STRING;
     }
 
 

Modified: incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessageFactory.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessageFactory.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessageFactory.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessageFactory.java Wed Feb 14 12:02:03 2007
@@ -23,6 +23,7 @@
 import javax.jms.JMSException;
 
 import org.apache.mina.common.ByteBuffer;
+import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.AMQException;
 
 public class JMSMapMessageFactory extends AbstractJMSMessageFactory

Modified: incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java Wed Feb 14 12:02:03 2007
@@ -27,6 +27,7 @@
 import java.io.Serializable;
 import java.nio.charset.CharacterCodingException;
 import java.nio.charset.Charset;
+import org.apache.qpid.framing.AMQShortString;
 
 import javax.jms.JMSException;
 import javax.jms.MessageFormatException;
@@ -37,14 +38,15 @@
 
 public class JMSObjectMessage extends AbstractJMSMessage implements ObjectMessage
 {
-    static final String MIME_TYPE = "application/java-object-stream";
+    public static final String MIME_TYPE = "application/java-object-stream";
+    private static final AMQShortString MIME_TYPE_SHORT_STRING = new AMQShortString(MIME_TYPE);
 
     private static final int DEFAULT_BUFFER_SIZE = 1024;
 
     /**
      * Creates empty, writable message for use by producers
      */
-    JMSObjectMessage()
+    public JMSObjectMessage()
     {
         this(null);
     }
@@ -57,7 +59,7 @@
             _data = ByteBuffer.allocate(DEFAULT_BUFFER_SIZE);
             _data.setAutoExpand(true);
         }
-        getMessageHeaders().setContentType(MIME_TYPE);
+        getMessageHeaders().setContentType(MIME_TYPE_SHORT_STRING);
     }
 
     /**
@@ -83,9 +85,9 @@
         return toString(_data);
     }
 
-    public String getMimeType()
+    public AMQShortString getMimeTypeAsShortString()
     {
-        return MIME_TYPE;
+        return MIME_TYPE_SHORT_STRING;
     }
 
     public void setObject(Serializable serializable) throws JMSException

Modified: incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessageFactory.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessageFactory.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessageFactory.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessageFactory.java Wed Feb 14 12:02:03 2007
@@ -24,6 +24,7 @@
 
 import org.apache.mina.common.ByteBuffer;
 import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQShortString;
 
 public class JMSObjectMessageFactory extends AbstractJMSMessageFactory
 {

Modified: incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java Wed Feb 14 12:02:03 2007
@@ -25,6 +25,7 @@
 
 import org.apache.mina.common.ByteBuffer;
 import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQShortString;
 
 /**
  * @author Apache Software Foundation
@@ -32,6 +33,7 @@
 public class JMSStreamMessage extends AbstractBytesTypedMessage implements StreamMessage
 {
     public static final String MIME_TYPE="jms/stream-message";
+    private static final AMQShortString MIME_TYPE_SHORT_STRING = new AMQShortString(MIME_TYPE);
 
 
     /**
@@ -40,7 +42,7 @@
      */
     private int _byteArrayRemaining = -1;
 
-    JMSStreamMessage()
+    public JMSStreamMessage()
     {
         this(null);
     }
@@ -69,9 +71,9 @@
         _readableMessage = true;
     }
 
-    public String getMimeType()
+    public AMQShortString getMimeTypeAsShortString()
     {
-        return MIME_TYPE;
+        return MIME_TYPE_SHORT_STRING;
     }
 
 

Modified: incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessageFactory.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessageFactory.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessageFactory.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessageFactory.java Wed Feb 14 12:02:03 2007
@@ -23,6 +23,7 @@
 import javax.jms.JMSException;
 
 import org.apache.mina.common.ByteBuffer;
+import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.AMQException;
 
 public class JMSStreamMessageFactory extends AbstractJMSMessageFactory

Modified: incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessage.java Wed Feb 14 12:02:03 2007
@@ -20,6 +20,9 @@
  */
 package org.apache.qpid.client.message;
 
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.AMQException;
+
 import java.io.UnsupportedEncodingException;
 import java.nio.charset.CharacterCodingException;
 import java.nio.charset.Charset;
@@ -27,42 +30,49 @@
 import javax.jms.JMSException;
 
 import org.apache.mina.common.ByteBuffer;
-import org.apache.qpid.AMQException;
 
 public class JMSTextMessage extends AbstractJMSMessage implements javax.jms.TextMessage
 {
     private static final String MIME_TYPE = "text/plain";
+    private static final AMQShortString MIME_TYPE_SHORT_STRING = new AMQShortString(MIME_TYPE);
+
 
     private String _decodedValue;
 
     /**
      * This constant represents the name of a property that is set when the message payload is null.
      */
-    private static final String PAYLOAD_NULL_PROPERTY = "JMS_QPID_NULL";
+    private static final AMQShortString PAYLOAD_NULL_PROPERTY = new AMQShortString("JMS_QPID_NULL");
+    private static final Charset DEFAULT_CHARSET = Charset.forName("UTF-8");
 
-    JMSTextMessage() throws JMSException
+    public JMSTextMessage() throws JMSException
     {
-        this(null, null);
+        this(null, (AMQShortString)null);
     }
 
-    JMSTextMessage(ByteBuffer data, String encoding) throws JMSException
+    JMSTextMessage(ByteBuffer data, AMQShortString encoding) throws JMSException
     {
         super(data); // this instantiates a content header
-        getMessageHeaders().setContentType(MIME_TYPE);
+        getMessageHeaders().setContentType(MIME_TYPE_SHORT_STRING);
         getMessageHeaders().setEncoding(encoding);
     }
 
+    JMSTextMessage(ByteBuffer data, String encoding) throws JMSException
+    {
+        this(data, new AMQShortString(encoding));
+    }
+
     JMSTextMessage(long deliveryTag, MessageHeaders contentHeader, ByteBuffer data)
             throws AMQException
     {
         super(deliveryTag, contentHeader, data);
-        contentHeader.setContentType(MIME_TYPE);
+        contentHeader.setContentType(MIME_TYPE_SHORT_STRING);
         _data = data;
     }
 
     JMSTextMessage(ByteBuffer data) throws JMSException
     {
-        this(data, null);
+        this(data, (AMQShortString)null);
     }
 
     JMSTextMessage(String text) throws JMSException
@@ -91,9 +101,9 @@
         _data = data;
     }
 
-    public String getMimeType()
+    public AMQShortString getMimeTypeAsShortString()
     {
-        return MIME_TYPE;
+        return MIME_TYPE_SHORT_STRING;
     }
 
     public void setText(String text) throws JMSException
@@ -115,7 +125,7 @@
                 }
                 else
                 {
-                    _data.put(text.getBytes(getMessageHeaders().getEncoding()));
+                    _data.put(text.getBytes(getMessageHeaders().getEncoding().asString()));
                 }
                 _changedData=true;
             }
@@ -151,7 +161,7 @@
             {
                 try
                 {
-                    _decodedValue = _data.getString(Charset.forName(getMessageHeaders().getEncoding()).newDecoder());
+                    _decodedValue = _data.getString(Charset.forName(getMessageHeaders().getEncoding().asString()).newDecoder());
                 }
                 catch (CharacterCodingException e)
                 {
@@ -164,7 +174,7 @@
             {
                 try
                 {
-                    _decodedValue = _data.getString(Charset.defaultCharset().newDecoder());
+                    _decodedValue = _data.getString(DEFAULT_CHARSET.newDecoder());
                 }
                 catch (CharacterCodingException e)
                 {

Modified: incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessageFactory.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessageFactory.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessageFactory.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/JMSTextMessageFactory.java Wed Feb 14 12:02:03 2007
@@ -24,6 +24,7 @@
 
 import org.apache.mina.common.ByteBuffer;
 import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQShortString;
 
 public class JMSTextMessageFactory extends AbstractJMSMessageFactory
 {

Modified: incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/MessageFactory.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/MessageFactory.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/MessageFactory.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/MessageFactory.java Wed Feb 14 12:02:03 2007
@@ -23,6 +23,7 @@
 import java.util.List;
 
 import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQShortString;
 
 import javax.jms.JMSException;
 

Modified: incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java Wed Feb 14 12:02:03 2007
@@ -28,10 +28,12 @@
 
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.Content;
+import org.apache.qpid.framing.AMQShortString;
 
 public class MessageFactoryRegistry
 {
-    private final Map _mimeToFactoryMap = new HashMap();
+    private final Map<String, MessageFactory> _mimeStringToFactoryMap = new HashMap<String, MessageFactory>();
+    private final Map<AMQShortString, MessageFactory> _mimeShortStringToFactoryMap = new HashMap<AMQShortString, MessageFactory>();
 
     public void registerFactory(String mimeType, MessageFactory mf)
     {
@@ -39,12 +41,14 @@
         {
             throw new IllegalArgumentException("Message factory must not be null");
         }
-        _mimeToFactoryMap.put(mimeType, mf);
+        _mimeStringToFactoryMap.put(mimeType, mf);
+        _mimeShortStringToFactoryMap.put(new AMQShortString(mimeType), mf);
     }
 
     public MessageFactory deregisterFactory(String mimeType)
     {
-        return (MessageFactory) _mimeToFactoryMap.remove(mimeType);
+        _mimeShortStringToFactoryMap.remove(new AMQShortString(mimeType));
+        return _mimeStringToFactoryMap.remove(mimeType);
     }
 
     /**
@@ -62,7 +66,7 @@
                                             MessageHeaders contentHeader,
                                             List contents) throws AMQException, JMSException
     {
-        MessageFactory mf = (MessageFactory) _mimeToFactoryMap.get(contentHeader.getContentType());
+        MessageFactory mf =  _mimeShortStringToFactoryMap.get(contentHeader.getContentType());
         if (mf == null)
         {
             throw new AMQException("Unsupport MIME type of " + contentHeader.getContentType());
@@ -79,7 +83,7 @@
         {
             throw new IllegalArgumentException("Mime type must not be null");
         }
-        MessageFactory mf = (MessageFactory) _mimeToFactoryMap.get(mimeType);
+        MessageFactory mf = _mimeStringToFactoryMap.get(mimeType);
         if (mf == null)
         {
             throw new AMQException("Unsupport MIME type of " + mimeType);
@@ -100,7 +104,7 @@
         mf.registerFactory(JMSMapMessage.MIME_TYPE, new JMSMapMessageFactory());
         mf.registerFactory("text/plain", new JMSTextMessageFactory());
         mf.registerFactory("text/xml", new JMSTextMessageFactory());
-        mf.registerFactory("application/octet-stream", new JMSBytesMessageFactory());
+        mf.registerFactory(JMSBytesMessage.MIME_TYPE, new JMSBytesMessageFactory());
         mf.registerFactory(JMSObjectMessage.MIME_TYPE, new JMSObjectMessageFactory());
         mf.registerFactory(JMSStreamMessage.MIME_TYPE, new JMSStreamMessageFactory());
         mf.registerFactory(null, new JMSBytesMessageFactory());

Modified: incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/MessageHeaders.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/MessageHeaders.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/MessageHeaders.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/MessageHeaders.java Wed Feb 14 12:02:03 2007
@@ -24,6 +24,7 @@
 import org.apache.log4j.Logger;
 import org.apache.mina.common.ByteBuffer;
 import org.apache.qpid.AMQPInvalidClassException;
+import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.framing.FieldTableFactory;
 
@@ -35,13 +36,13 @@
 {
     private static final Logger _logger = Logger.getLogger(MessageHeaders.class);
     
-    private String _contentType;
+    private AMQShortString _contentType;
 
-    private String _encoding;
+    private AMQShortString _encoding;
     
-    private String _destination;
+    private AMQShortString _destination;
     
-    private String _exchange;
+    private AMQShortString _exchange;
 
     private FieldTable _jmsHeaders;
 
@@ -49,33 +50,35 @@
 
     private short _priority;
 
-    private String _correlationId;
+    private AMQShortString _correlationId;
 
-    private String _replyTo;
+    private AMQShortString _replyTo;
 
     private long _expiration;
 
-    private String _messageId;
+    private AMQShortString _messageId;
 
     private long _timestamp;
 
-    private String _type;
+    private AMQShortString _type;
 
-    private String _userId;
+    private AMQShortString _userId;
 
-    private String _appId;
+    private AMQShortString _appId;
 
-    private String _transactionId;
+    private AMQShortString _transactionId;
 
-    private String _routingKey;
+    private AMQShortString _routingKey;
     
     private int _size;
     
-    public int getSize() {
+    public int getSize()
+    {
 		return _size;
 	}
 
-	public void setSize(int size) {
+	public void setSize(int size)
+    {
 		this._size = size;
 	}
 
@@ -83,29 +86,24 @@
     {
     }
 
-    public String getContentTypeShortString()
+    public AMQShortString getContentType()
     {
         return _contentType;
     }
 
-    public String getContentType()
+    public void setContentType(AMQShortString contentType)
     {
-        return _contentType == null ? null : _contentType.toString();
+        _contentType = contentType;
     }
 
-    public void setContentType(String contentType)
+    public AMQShortString getEncoding()
     {
-        _contentType = contentType == null ? null : new String(contentType);
+        return _encoding;
     }
 
-    public String getEncoding()
+    public void setEncoding(AMQShortString encoding)
     {
-        return _encoding == null ? null : _encoding.toString();
-    }
-
-    public void setEncoding(String encoding)
-    {
-        _encoding = encoding == null ? null : new String(encoding);
+        _encoding = encoding;
     }
 
     public FieldTable getJMSHeaders()
@@ -144,22 +142,22 @@
         _priority = priority;
     }
 
-    public String getCorrelationId()
+    public AMQShortString getCorrelationId()
     {
-        return _correlationId == null ? null : _correlationId.toString();
+        return _correlationId;
     }
 
-    public void setCorrelationId(String correlationId)
+    public void setCorrelationId(AMQShortString correlationId)
     {
-        _correlationId = correlationId == null ? null : new String(correlationId);
+        _correlationId = correlationId;
     }
 
-    public String getReplyTo()
+    public AMQShortString getReplyTo()
     {
-        return _replyTo == null ? null : _replyTo.toString();
+        return _replyTo;
     }
     
-    public void setReplyTo(String replyTo)
+    public void setReplyTo(AMQShortString replyTo)
     {
         _replyTo = replyTo;
     }
@@ -175,12 +173,12 @@
     }
 
 
-    public String getMessageId()
+    public AMQShortString getMessageId()
     {
         return _messageId;
     }
 
-    public void setMessageId(String messageId)
+    public void setMessageId(AMQShortString messageId)
     {
         _messageId = messageId;
     }
@@ -195,39 +193,39 @@
         _timestamp = timestamp;
     }
 
-    public String getType()
+    public AMQShortString getType()
     {
-        return _type == null ? null : _type.toString();
+        return _type;
     }
 
-    public void setType(String type)
+    public void setType(AMQShortString type)
     {
-        _type = type == null ? null : new String(type);
+        _type = type;
     }
 
-    public String getUserId()
+    public AMQShortString getUserId()
     {
-        return _userId == null ? null : _userId.toString();
+        return _userId;
     }
 
-    public void setUserId(String userId)
+    public void setUserId(AMQShortString userId)
     {
-        _userId = userId == null ? null : new String(userId);
+        _userId = userId;
     }
 
-    public String getAppId()
+    public AMQShortString getAppId()
     {
-        return _appId == null ? null : _appId.toString();
+        return _appId;
     }
 
-    public void setAppId(String appId)
+    public void setAppId(AMQShortString appId)
     {
-        _appId = appId == null ? null : new String(appId);
+        _appId = appId;
     }
 
     // MapMessage  Interface
 
-    public boolean getBoolean(String string) throws JMSException
+    public boolean getBoolean(AMQShortString string) throws JMSException
     {
         Boolean b = getJMSHeaders().getBoolean(string);
 
@@ -237,13 +235,13 @@
             {
                 Object str = getJMSHeaders().getObject(string);
 
-                if (str == null || !(str instanceof String))
+                if (str == null || !(str instanceof AMQShortString))
                 {
                     throw new MessageFormatException("getBoolean can't use " + string + " item.");
                 }
                 else
                 {
-                    return Boolean.valueOf((String) str);
+                    return Boolean.valueOf(((AMQShortString)str).asString());
                 }
             }
             else
@@ -255,13 +253,13 @@
         return b;
     }
 
-    public char getCharacter(String string) throws JMSException
+    public char getCharacter(AMQShortString string) throws JMSException
     {
         Character c = getJMSHeaders().getCharacter(string);
 
         if (c == null)
         {
-            if (getJMSHeaders().isNullStringValue(string))
+            if (getJMSHeaders().isNullStringValue(string.asString()))
             {
                 throw new NullPointerException("Cannot convert null char");
             }
@@ -276,7 +274,7 @@
         }
     }
 
-    public byte[] getBytes(String string) throws JMSException
+    public byte[] getBytes(AMQShortString string) throws JMSException
     {
         byte[] bs = getJMSHeaders().getBytes(string);
 
@@ -290,7 +288,7 @@
         }
     }
 
-    public byte getByte(String string) throws JMSException
+    public byte getByte(AMQShortString string) throws JMSException
     {
             Byte b = getJMSHeaders().getByte(string);
             if (b == null)
@@ -299,13 +297,13 @@
                 {
                     Object str = getJMSHeaders().getObject(string);
 
-                    if (str == null || !(str instanceof String))
+                    if (str == null || !(str instanceof AMQShortString))
                     {
                         throw new MessageFormatException("getByte can't use " + string + " item.");
                     }
                     else
                     {
-                        return Byte.valueOf((String) str);
+                        return Byte.valueOf(((AMQShortString)str).asString());
                     }
                 }
                 else
@@ -317,7 +315,7 @@
             return b;
     }
 
-    public short getShort(String string) throws JMSException
+    public short getShort(AMQShortString string) throws JMSException
     {
             Short s = getJMSHeaders().getShort(string);
 
@@ -329,7 +327,7 @@
             return s;
     }
 
-    public int getInteger(String string) throws JMSException
+    public int getInteger(AMQShortString string) throws JMSException
     {
             Integer i = getJMSHeaders().getInteger(string);
 
@@ -341,7 +339,7 @@
             return i;
     }
 
-    public long getLong(String string) throws JMSException
+    public long getLong(AMQShortString string) throws JMSException
     {
             Long l = getJMSHeaders().getLong(string);
 
@@ -353,7 +351,7 @@
             return l;
     }
 
-    public float getFloat(String string) throws JMSException
+    public float getFloat(AMQShortString string) throws JMSException
     {
             Float f = getJMSHeaders().getFloat(string);
 
@@ -363,13 +361,13 @@
                 {
                     Object str = getJMSHeaders().getObject(string);
 
-                    if (str == null || !(str instanceof String))
+                    if (str == null || !(str instanceof AMQShortString))
                     {
                         throw new MessageFormatException("getFloat can't use " + string + " item.");
                     }
                     else
                     {
-                        return Float.valueOf((String) str);
+                        return Float.valueOf(((AMQShortString)str).asString());
                     }
                 }
                 else
@@ -382,7 +380,7 @@
             return f;
     }
 
-    public double getDouble(String string) throws JMSException
+    public double getDouble(AMQShortString string) throws JMSException
     {
             Double d = getJMSHeaders().getDouble(string);
 
@@ -394,9 +392,9 @@
             return d;
     }
 
-    public String getString(String string) throws JMSException
+    public AMQShortString getString(AMQShortString string) throws JMSException
     {
-        String s = getJMSHeaders().getString(string);
+        AMQShortString s = new AMQShortString(getJMSHeaders().getString(string.asString()));
 
         if (s == null)
         {
@@ -415,7 +413,7 @@
                     }
                     else
                     {
-                        s = String.valueOf(o);
+                        s = (AMQShortString) o;
                     }
                 }
             }
@@ -424,76 +422,76 @@
         return s;
     }
 
-    public Object getObject(String string) throws JMSException
+    public Object getObject(AMQShortString string) throws JMSException
     {
         return getJMSHeaders().getObject(string);
     }
 
-    public void setBoolean(String string, boolean b) throws JMSException
+    public void setBoolean(AMQShortString string, boolean b) throws JMSException
     {
         checkPropertyName(string);
         getJMSHeaders().setBoolean(string, b);
     }
 
-    public void setChar(String string, char c) throws JMSException
+    public void setChar(AMQShortString string, char c) throws JMSException
     {
         checkPropertyName(string);
         getJMSHeaders().setChar(string, c);
     }
 
-    public Object setBytes(String string, byte[] bytes)
+    public Object setBytes(AMQShortString string, byte[] bytes)
     {
         return getJMSHeaders().setBytes(string, bytes);
     }
 
-    public Object setBytes(String string, byte[] bytes, int start, int length)
+    public Object setBytes(AMQShortString string, byte[] bytes, int start, int length)
     {
         return getJMSHeaders().setBytes(string, bytes, start, length);
     }
 
-    public void setByte(String string, byte b) throws JMSException
+    public void setByte(AMQShortString string, byte b) throws JMSException
     {
         checkPropertyName(string);
         getJMSHeaders().setByte(string, b);
     }
 
-    public void setShort(String string, short i) throws JMSException
+    public void setShort(AMQShortString string, short i) throws JMSException
     {
         checkPropertyName(string);
         getJMSHeaders().setShort(string, i);
     }
 
-    public void setInteger(String string, int i) throws JMSException
+    public void setInteger(AMQShortString string, int i) throws JMSException
     {
         checkPropertyName(string);
         getJMSHeaders().setInteger(string, i);
     }
 
-    public void setLong(String string, long l) throws JMSException
+    public void setLong(AMQShortString string, long l) throws JMSException
     {
         checkPropertyName(string);
         getJMSHeaders().setLong(string, l);
     }
 
-    public void setFloat(String string, float v) throws JMSException
+    public void setFloat(AMQShortString string, float v) throws JMSException
     {
         checkPropertyName(string);
         getJMSHeaders().setFloat(string, v);
     }
 
-    public void setDouble(String string, double v) throws JMSException
+    public void setDouble(AMQShortString string, double v) throws JMSException
     {
         checkPropertyName(string);
         getJMSHeaders().setDouble(string, v);
     }
 
-    public void setString(String string, String string1) throws JMSException
+    public void setString(AMQShortString string, AMQShortString string1) throws JMSException
     {
         checkPropertyName(string);
-        getJMSHeaders().setString(string, string1);
+        getJMSHeaders().setString(string.asString(), string1.asString());
     }
 
-    public void setObject(String string, Object object) throws JMSException
+    public void setObject(AMQShortString string, Object object) throws JMSException
     {
         checkPropertyName(string);
         try
@@ -506,7 +504,7 @@
         }
     }
 
-    public boolean itemExists(String string) throws JMSException
+    public boolean itemExists(AMQShortString string) throws JMSException
     {
         return getJMSHeaders().containsKey(string);
     }
@@ -521,7 +519,7 @@
         getJMSHeaders().clear();
     }
 
-    public boolean propertyExists(String propertyName)
+    public boolean propertyExists(AMQShortString propertyName)
     {
         return getJMSHeaders().propertyExists(propertyName);
     }
@@ -531,7 +529,7 @@
         return getJMSHeaders().setObject(key.toString(), value);
     }
 
-    public Object remove(String propertyName)
+    public Object remove(AMQShortString propertyName)
     {
         return getJMSHeaders().remove(propertyName);
     }
@@ -637,35 +635,43 @@
 
     }
 
-	public String getTransactionId() {
+	public AMQShortString getTransactionId()
+    {
 		return _transactionId;
 	}
 
-	public void setTransactionId(String id) {
+	public void setTransactionId(AMQShortString id)
+    {
 		_transactionId = id;
 	}
 
-	public String getDestination() {
+	public AMQShortString getDestination()
+    {
 		return _destination;
 	}
 
-	public void setDestination(String destination) {
+	public void setDestination(AMQShortString destination)
+    {
 		this._destination = destination;
 	}
 
-	public String getExchange() {
+	public AMQShortString getExchange()
+    {
 		return _exchange;
 	}
 
-	public void setExchange(String exchange) {
+	public void setExchange(AMQShortString exchange)
+    {
 		this._exchange = exchange;
 	}
 
-	public String getRoutingKey() {
+	public AMQShortString getRoutingKey()
+    {
 		return _routingKey;
 	}
 
-	public void setRoutingKey(String routingKey) {
+	public void setRoutingKey(AMQShortString routingKey)
+    {
 		this._routingKey = routingKey;
 	}
 }

Modified: incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java Wed Feb 14 12:02:03 2007
@@ -24,28 +24,69 @@
 import java.util.List;
 
 /**
- * This class contains everything needed to process a JMS message. It assembles the
- * deliver body, the content header and the content body/ies.
+ * This class contains everything needed to process a JMS message.
  *
  * Note that the actual work of creating a JMS message for the client code's use is done
  * outside of the MINA dispatcher thread in order to minimise the amount of work done in
  * the MINA dispatcher thread.
  *
  */
-public class UnprocessedMessage {
-	public int bytesReceived = 0;
+public class UnprocessedMessage
+{
+	private int bytesReceived = 0;
+	private int channelId;
+	private List<byte[]> contents = new LinkedList();
+	private long deliveryTag;
+	private MessageHeaders messageHeaders;
+    
+    public UnprocessedMessage(int channelId, long deliveryTag, MessageHeaders messageHeaders)
+    {
+        this.channelId = channelId;
+        this.deliveryTag = deliveryTag;
+        this.messageHeaders = messageHeaders;
+    }
+    
+    public UnprocessedMessage(int channelId, long deliveryTag, MessageHeaders messageHeaders, byte[] content)
+    {
+        this.channelId = channelId;
+        this.deliveryTag = deliveryTag;
+        this.messageHeaders = messageHeaders;
+        addContent(content);
+    }
 
-	public List contents = new LinkedList();
-
-	public int channelId;
-
-	public long deliveryTag;
-
-	public MessageHeaders contentHeader;
-
-	public void addContent(byte[] content) {
+	public void addContent(byte[] content)
+    {
 		contents.add(content);
-		bytesReceived = bytesReceived + content.length;
+		bytesReceived += content.length;
 	}
 
+    public int getBytesReceived()
+    {
+        return bytesReceived;
+    }
+
+    public int getChannelId()
+    {
+        return channelId;
+    }
+    
+    public List<byte[]> getContents()
+    {
+        return contents;
+    }
+
+    public long getDeliveryTag()
+    {
+        return deliveryTag;
+    }
+    
+    public MessageHeaders getMessageHeaders()
+    {
+        return messageHeaders;
+    }
+    
+    public String toString()
+    {
+        return "UnprocessedMessage: ch=" + channelId + "; bytesReceived=" + bytesReceived + "; deliveryTag=" + deliveryTag + "; MsgHdrs=" + messageHeaders + "Num contents=" + contents.size() + "; First content=" + new String(contents.get(0));
+    }
 }

Modified: incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java Wed Feb 14 12:02:03 2007
@@ -24,11 +24,15 @@
 import org.apache.mina.common.IdleStatus;
 import org.apache.mina.common.IoHandlerAdapter;
 import org.apache.mina.common.IoSession;
+import org.apache.mina.common.IoServiceConfig;
 import org.apache.mina.filter.SSLFilter;
 import org.apache.mina.filter.codec.ProtocolCodecFilter;
 import org.apache.qpid.AMQConnectionClosedException;
 import org.apache.qpid.AMQDisconnectedException;
 import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQTimeoutException;
+import org.apache.qpid.pool.ReadWriteThreadModel;
+import org.apache.qpid.protocol.AMQMethodEvent;
 import org.apache.qpid.client.AMQConnection;
 import org.apache.qpid.client.AMQSession;
 import org.apache.qpid.client.failover.FailoverHandler;
@@ -37,11 +41,13 @@
 import org.apache.qpid.client.state.AMQStateManager;
 import org.apache.qpid.client.state.listener.SpecificMethodFrameListener;
 import org.apache.qpid.codec.AMQCodecFactory;
+import org.apache.qpid.framing.AMQBody;
 import org.apache.qpid.framing.AMQDataBlock;
 import org.apache.qpid.framing.AMQFrame;
 import org.apache.qpid.framing.AMQMethodBody;
 import org.apache.qpid.framing.AMQRequestBody;
 import org.apache.qpid.framing.AMQResponseBody;
+import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.ConnectionCloseBody;
 import org.apache.qpid.framing.ConnectionCloseOkBody;
 import org.apache.qpid.framing.HeartbeatBody;
@@ -54,6 +60,7 @@
 import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.CountDownLatch;
 
+
 public class AMQProtocolHandler extends IoHandlerAdapter
 {
     private static final Logger _logger = Logger.getLogger(AMQProtocolHandler.class);
@@ -95,24 +102,11 @@
 
     private CountDownLatch _failoverLatch;
 
+    private final long DEFAULT_SYNC_TIMEOUT = 1000 * 30;
+
     public AMQProtocolHandler(AMQConnection con)
     {
         _connection = con;
-
-        // We add a proxy for the state manager so that we can substitute the state manager easily in this class.
-        // We substitute the state manager when performing failover
-        _frameListeners.add(new AMQMethodListener()
-        {
-            public boolean methodReceived(AMQMethodEvent evt) throws AMQException
-            {
-                return _stateManager.methodReceived(evt);
-            }
-
-            public void error(Exception e)
-            {
-                _stateManager.error(e);
-            }
-        });
     }
 
     public boolean isUseSSL()
@@ -149,13 +143,25 @@
             session.getFilterChain().addBefore("protocolFilter", "ssl", sslFilter);
         }
 
+        try
+        {
+
+            ReadWriteThreadModel threadModel = ReadWriteThreadModel.getInstance();
+            threadModel.getAsynchronousReadFilter().createNewJobForSession(session);
+            threadModel.getAsynchronousWriteFilter().createNewJobForSession(session);
+        }
+        catch (RuntimeException e)
+        {
+            e.printStackTrace();
+        }
+  
         _protocolSession = new AMQProtocolSession(this, session, _connection, getStateManager());
         _protocolSession.init();
     }
 
     public void sessionOpened(IoSession session) throws Exception
     {
-        System.setProperty("foo", "bar");
+        //System.setProperty("foo", "bar");
     }
 
     /**
@@ -286,11 +292,14 @@
     public void propagateExceptionToWaiters(Exception e)
     {
         getStateManager().error(e);
-        final Iterator it = _frameListeners.iterator();
-        while (it.hasNext())
+        if (!_frameListeners.isEmpty())
         {
-            final AMQMethodListener ml = (AMQMethodListener) it.next();
-            ml.error(e);
+            final Iterator it = _frameListeners.iterator();
+            while (it.hasNext())
+            {
+                final AMQMethodListener ml = (AMQMethodListener) it.next();
+                ml.error(e);
+            }
         }
     }
 
@@ -298,59 +307,26 @@
 
     public void messageReceived(IoSession session, Object message) throws Exception
     {
+        final boolean debug = _logger.isDebugEnabled();
+        final long msgNumber = ++_messageReceivedCount;
 
-        if (_messageReceivedCount++ % 1000 == 0)
+        if (debug && (msgNumber % 1000 == 0))
         {
             _logger.debug("Received " + _messageReceivedCount + " protocol messages");
         }
-        Iterator it = _frameListeners.iterator();
-        AMQFrame frame = (AMQFrame) message;
 
-        HeartbeatDiagnostics.received(frame.bodyFrame instanceof HeartbeatBody);
+        AMQFrame frame = (AMQFrame) message;
+        final AMQBody bodyFrame = frame.getBodyFrame();
 
-        if (frame.bodyFrame instanceof AMQRequestBody)
+        if (bodyFrame instanceof AMQRequestBody)
         {   
-            _protocolSession.messageRequestBodyReceived(frame.channel, (AMQRequestBody)frame.bodyFrame);
+            _protocolSession.messageRequestBodyReceived(frame.getChannel(), (AMQRequestBody)bodyFrame);
         }
-        else if (frame.bodyFrame instanceof AMQResponseBody)
+        else if (bodyFrame instanceof AMQResponseBody)
         {
-            _protocolSession.messageResponseBodyReceived(frame.channel, (AMQResponseBody)frame.bodyFrame);
+            _protocolSession.messageResponseBodyReceived(frame.getChannel(), (AMQResponseBody)bodyFrame);
         }
-//         if (frame.bodyFrame instanceof AMQMethodBody)
-//         {
-//             if (_logger.isDebugEnabled())
-//             {
-//                 _logger.debug("Method frame received: " + frame);
-//             }
-// 
-//             final AMQMethodEvent<AMQMethodBody> evt = new AMQMethodEvent<AMQMethodBody>(frame.channel, (AMQMethodBody) frame.bodyFrame);
-//             try
-//             {
-//                 boolean wasAnyoneInterested = false;
-//                 Q
-//             }
-//             catch (AMQException e)
-//             {
-//                 it = _frameListeners.iterator();
-//                 while (it.hasNext())
-//                 {
-//                     final AMQMethodListener listener = (AMQMethodListener) it.next();
-//                     listener.error(e);
-//                 }
-//                 exceptionCaught(session, e);
-//             }
-//         }
-//         else if (frame.bodyFrame instanceof ContentHeaderBody)
-//         {
-//             _protocolSession.messageContentHeaderReceived(frame.channel,
-//                                                           (ContentHeaderBody) frame.bodyFrame);
-//         }
-//         else if (frame.bodyFrame instanceof ContentBody)
-//         {
-//             _protocolSession.messageContentBodyReceived(frame.channel,
-//                                                         (ContentBody) frame.bodyFrame);
-//         }
-        else if (frame.bodyFrame instanceof HeartbeatBody)
+        else if (bodyFrame instanceof HeartbeatBody)
         {
             _logger.debug("Received heartbeat");
         }
@@ -361,27 +337,32 @@
 
     public void messageSent(IoSession session, Object message) throws Exception
     {
-        if (_messagesOut++ % 1000 == 0)
+        final long sentMessages = _messagesOut++;
+
+        final boolean debug = _logger.isDebugEnabled();
+
+        if (debug && (sentMessages % 1000 == 0))
         {
             _logger.debug("Sent " + _messagesOut + " protocol messages");
         }
         _connection.bytesSent(session.getWrittenBytes());
-        if (_logger.isDebugEnabled())
+        if (debug)
         {
             _logger.debug("Sent frame " + message);
         }
     }
 
-    public void addFrameListener(AMQMethodListener listener)
-    {
-        _frameListeners.add(listener);
-    }
-
-    public void removeFrameListener(AMQMethodListener listener)
-    {
-        _frameListeners.remove(listener);
-    }
-
+    /*
+      public void addFrameListener(AMQMethodListener listener)
+      {
+          _frameListeners.add(listener);
+      }
+
+      public void removeFrameListener(AMQMethodListener listener)
+      {
+          _frameListeners.remove(listener);
+      }
+    */
     public void attainState(AMQState s) throws AMQException
     {
         getStateManager().attainState(s);
@@ -418,19 +399,36 @@
      * a particular response. Equivalent to calling getProtocolSession().write() then
      * waiting for the response.
      *
+     * @param channelNum
      * @param methodBody
-     * @param listener the blocking listener. Note the calling thread will block.
+     * @param listener The blocking listener. Note the calling thread will block.
      */
     private AMQMethodEvent writeCommandFrameAndWaitForReply(int channelNum, AMQMethodBody methodBody,
                                                             BlockingMethodFrameListener listener)
             throws AMQException
     {
+        return writeCommandFrameAndWaitForReply(channelNum, methodBody, listener, DEFAULT_SYNC_TIMEOUT);
+    }
+
+    /**
+     * Convenience method that writes a frame to the protocol session and waits for
+     * a particular response. Equivalent to calling getProtocolSession().write() then
+     * waiting for the response.
+     *
+     * @param channelNum
+     * @param methodBody
+     * @param listener The blocking listener. Note the calling thread will block.
+     */
+    private AMQMethodEvent writeCommandFrameAndWaitForReply(int channelNum, AMQMethodBody methodBody,
+                                                            BlockingMethodFrameListener listener, long timeout)
+            throws AMQException
+    {
         try
         {
             _frameListeners.add(listener);
             _protocolSession.writeRequest(channelNum, methodBody, listener);
 
-            AMQMethodEvent e = listener.blockForFrame();
+            AMQMethodEvent e = listener.blockForFrame(timeout);
             return e;
             // When control resumes before this line, a reply will have been received
             // that matches the criteria defined in the blocking listener
@@ -440,7 +438,6 @@
             // If we don't remove the listener then no-one will
             _frameListeners.remove(listener);
         }
-
     }
 
     /**
@@ -484,19 +481,26 @@
     {
         getStateManager().changeState(AMQState.CONNECTION_CLOSING);
 
-        // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)
-        // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
         // Be aware of possible changes to parameter order as versions change.
         AMQMethodBody methodBody = ConnectionCloseBody.createMethodBody(
-            (byte)0, (byte)9,	// AMQP version (major, minor)
+            _protocolSession.getProtocolMajorVersion(), // AMQP major version
+            _protocolSession.getProtocolMinorVersion(), // AMQP minor version
             0,	// classId
             0,	// methodId
             AMQConstant.REPLY_SUCCESS.getCode(),	// replyCode
-            "JMS client is closing the connection.");	// replyText
-        
-        syncWrite(0, methodBody, ConnectionCloseOkBody.class);
+            new AMQShortString("JMS client is closing the connection."));	// replyText
+
+        try
+        {
+            syncWrite(0, methodBody, ConnectionCloseOkBody.class);
+            _protocolSession.closeProtocolSession();
+        }
+        catch (AMQTimeoutException e)
+        {
+            _protocolSession.closeProtocolSession(false);
+        }
+
 
-        _protocolSession.closeProtocolSession();
     }
 
     /**
@@ -531,7 +535,7 @@
         }
     }
 
-    public String generateQueueName()
+    public AMQShortString generateQueueName()
     {
         return _protocolSession.generateQueueName();
     }
@@ -567,7 +571,7 @@
         return _protocolSession;
     }
 
-    FailoverState getFailoverState()
+    public FailoverState getFailoverState()
     {
         return _failoverState;
     }
@@ -577,8 +581,18 @@
         _failoverState = failoverState;
     }
     
-    public int getConnectionId()
+    public long getConnectionId()
     {
         return _connection.getConnectionId();
+    }
+
+    public byte getProtocolMajorVersion()
+    {
+        return _protocolSession.getProtocolMajorVersion();
+    }
+
+    public byte getProtocolMinorVersion()
+    {
+        return _protocolSession.getProtocolMinorVersion();
     }
 }

Modified: incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java Wed Feb 14 12:02:03 2007
@@ -37,28 +37,32 @@
 import org.apache.qpid.client.AMQSession;
 import org.apache.qpid.client.ConnectionTuneParameters;
 import org.apache.qpid.client.message.UnprocessedMessage;
-import org.apache.qpid.client.state.AMQStateManager;
 import org.apache.qpid.framing.AMQDataBlock;
 import org.apache.qpid.framing.AMQMethodBody;
 import org.apache.qpid.framing.AMQRequestBody;
 import org.apache.qpid.framing.AMQResponseBody;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.MainRegistry;
 import org.apache.qpid.framing.MessageAppendBody;
 import org.apache.qpid.framing.ProtocolInitiation;
 import org.apache.qpid.framing.ProtocolVersionList;
 import org.apache.qpid.framing.RequestManager;
 import org.apache.qpid.framing.RequestResponseMappingException;
 import org.apache.qpid.framing.ResponseManager;
+import org.apache.qpid.framing.VersionSpecificRegistry;
 import org.apache.qpid.protocol.AMQMethodEvent;
 import org.apache.qpid.protocol.AMQMethodListener;
 import org.apache.qpid.protocol.AMQProtocolWriter;
+import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
+import org.apache.qpid.client.state.AMQStateManager;
 
 /**
  * Wrapper for protocol session that provides type-safe access to session attributes.
- *
+ * <p/>
  * The underlying protocol session is still available but clients should not
  * use it to obtain session attributes.
  */
-public class AMQProtocolSession implements AMQProtocolWriter, ProtocolVersionList
+public class AMQProtocolSession implements ProtocolVersionList, AMQVersionAwareProtocolSession
 {
 
     protected static final int LAST_WRITE_FUTURE_JOIN_TIMEOUT = 1000 * 60 * 2;
@@ -107,7 +111,12 @@
     protected int _queueId = 1;
     protected final Object _queueIdLock = new Object();
     
-    protected int _ConnectionId;
+    protected long _ConnectionId;
+
+    private byte _protocolMinorVersion;
+    private byte _protocolMajorVersion;
+    private VersionSpecificRegistry _registry = MainRegistry.getVersionSpecificRegistry(pv[pv.length-1][PROTOCOL_MAJOR],pv[pv.length-1][PROTOCOL_MINOR]);
+
 
     /**
      * No-arg constructor for use by test subclass - has to initialise final vars
@@ -130,6 +139,8 @@
         _protocolHandler = protocolHandler;
         _minaProtocolSession = protocolSession;
         // properties of the connection are made available to the event handlers
+        _minaProtocolSession.setWriteTimeout(LAST_WRITE_FUTURE_JOIN_TIMEOUT);
+        //fixme - real value needed
         _minaProtocolSession.setAttribute(AMQ_CONNECTION, connection);
         _stateManager = new AMQStateManager(this);
 
@@ -143,6 +154,7 @@
     {
         _protocolHandler = protocolHandler;
         _minaProtocolSession = protocolSession;
+        _minaProtocolSession.setAttachment(this);
         // properties of the connection are made available to the event handlers
         _minaProtocolSession.setAttribute(AMQ_CONNECTION, connection);
 
@@ -221,8 +233,9 @@
 
     /**
      * Store the SASL client currently being used for the authentication handshake
+     *
      * @param client if non-null, stores this in the session. if null clears any existing client
-     * being stored
+     *               being stored
      */
     public void setSaslClient(SaslClient client)
     {
@@ -253,6 +266,7 @@
     /**
      * This is involed from MessageTransferMethodHandler if type is CONTENT_TYPE_REFERENCE
      * This is invoked on the MINA dispatcher thread.
+     *
      * @param message
      * @throws AMQException if this was not expected
      */
@@ -296,13 +310,14 @@
      * This is involed from MessageTransferMethodHandler if type is CONTENT_TYPE_INLINE
      * Deliver a message to the appropriate session, removing the unprocessed message
      * from our map
+     *
      * @param channelId the channel id the message should be delivered to
-     * @param msg the message
+     * @param msg       the message
      */
     public void deliverMessageToAMQSession(int channelId, UnprocessedMessage msg)
     {
         AMQSession session = (AMQSession) _channelId2SessionMap.get(channelId);
-        msg.contentHeader.setSize(msg.bytesReceived);
+        msg.getMessageHeaders().setSize(msg.getBytesReceived());
         session.messageReceived(msg);
     }
     
@@ -360,6 +375,7 @@
         WriteFuture f = _minaProtocolSession.write(frame);
         if (wait)
         {
+            //fixme -- time out?
             f.join();
         }
         else
@@ -404,6 +420,7 @@
 
     /**
      * Starts the process of closing a session
+     *
      * @param session the AMQSession being closed
      */
     public void closeSession(AMQSession session)
@@ -425,19 +442,27 @@
      * This method decides whether this is a response or an initiation. The latter
      * case causes the AMQSession to be closed and an exception to be thrown if
      * appropriate.
+     *
      * @param channelId the id of the channel (session)
      * @return true if the client must respond to the server, i.e. if the server
-     * initiated the channel close, false if the channel close is just the server
-     * responding to the client's earlier request to close the channel.
+     *         initiated the channel close, false if the channel close is just the server
+     *         responding to the client's earlier request to close the channel.
      */
-    public boolean channelClosed(int channelId, int code, String text)
+    public boolean channelClosed(int channelId, int code, String text) throws AMQException
     {
         final Integer chId = channelId;
         // if this is not a response to an earlier request to close the channel
         if (_closingChannels.remove(chId) == null)
         {
             final AMQSession session = (AMQSession) _channelId2SessionMap.get(chId);
-            session.closed(new AMQException(_logger, code, text));
+            try
+            {
+                session.closed(new AMQException(_logger, code, text));
+            }
+            catch (JMSException e)
+            {
+                throw new AMQException("JMSException received while closing session", e);
+            }
             return true;
         }
         else
@@ -453,15 +478,20 @@
 
     public void closeProtocolSession()
     {
+        closeProtocolSession(true);
+    }
+
+    public void closeProtocolSession(boolean waitLast)
+    {
         _logger.debug("Waiting for last write to join.");
-        if (_lastWriteFuture != null)
+        if (waitLast && _lastWriteFuture != null)
         {
             _lastWriteFuture.join(LAST_WRITE_FUTURE_JOIN_TIMEOUT);
         }
 
         _logger.debug("Closing protocol session");
         final CloseFuture future = _minaProtocolSession.close();
-        future.join();
+        future.join(LAST_WRITE_FUTURE_JOIN_TIMEOUT);
     }
 
     public void failover(String host, int port)
@@ -469,20 +499,19 @@
         _protocolHandler.failover(host, port);
     }
 
-    protected String generateQueueName()
+    protected AMQShortString generateQueueName()
     {
         int id;
-        synchronized(_queueIdLock)
+        synchronized (_queueIdLock)
         {
             id = _queueId++;
         }
         //get rid of / and : and ; from address for spec conformance
-        String localAddress = StringUtils.replaceChars(_minaProtocolSession.getLocalAddress().toString(),"/;:","");
-        return "tmp_" + localAddress + "_" + id;
+        String localAddress = StringUtils.replaceChars(_minaProtocolSession.getLocalAddress().toString(), "/;:", "");
+        return new AMQShortString("tmp_" + localAddress + "_" + id);
     }
 
     /**
-     *
      * @param delay delay in seconds (not ms)
      */
     void initHeartbeats(int delay)
@@ -495,11 +524,39 @@
         }
     }
 
-    public void confirmConsumerCancelled(int channelId, String consumerTag)
+    public void confirmConsumerCancelled(int channelId, AMQShortString consumerTag)
     {
         final Integer chId = channelId;
         final AMQSession session = (AMQSession) _channelId2SessionMap.get(chId);
 
         session.confirmConsumerCancelled(consumerTag);
     }
+
+    public void setProtocolVersion(final byte versionMajor, final byte versionMinor)
+    {
+        _protocolMajorVersion = versionMajor;
+        _protocolMinorVersion = versionMinor;
+        _registry = MainRegistry.getVersionSpecificRegistry(versionMajor, versionMinor);        
+    }
+
+    public byte getProtocolMinorVersion()
+    {
+        return _protocolMinorVersion;
+    }
+
+    public byte getProtocolMajorVersion()
+    {
+        return _protocolMajorVersion;
+    }
+    
+    public boolean isProtocolVersionEqual(byte major, byte minor)
+    {
+        return _protocolMinorVersion == major && _protocolMajorVersion == minor;
+    }
+    
+    public VersionSpecificRegistry getRegistry()
+    {
+        return _registry;
+    }
+
 }