You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2008/08/15 05:41:24 UTC

svn commit: r686136 [13/17] - in /incubator/qpid/branches/qpid.0-10/java: ./ broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/diagnostic/ broker-plugins/src/main/java/org/apache/qpid/extras/exchanges/example/ broker/bin/ broker/etc/ broker...

Modified: incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl.java Thu Aug 14 20:40:49 2008
@@ -27,31 +27,32 @@
 import org.apache.qpid.AMQException;
 import org.apache.qpid.client.state.AMQStateManager;
 import org.apache.qpid.client.state.AMQMethodNotImplementedException;
-
+import org.apache.qpid.client.protocol.AMQProtocolSession;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class ClientMethodDispatcherImpl implements MethodDispatcher
 {
 
-
-    private static final BasicCancelOkMethodHandler      _basicCancelOkMethodHandler      = BasicCancelOkMethodHandler.getInstance();
-    private static final BasicDeliverMethodHandler       _basicDeliverMethodHandler       = BasicDeliverMethodHandler.getInstance();
-    private static final BasicReturnMethodHandler        _basicReturnMethodHandler        = BasicReturnMethodHandler.getInstance();
-    private static final ChannelCloseMethodHandler       _channelCloseMethodHandler       = ChannelCloseMethodHandler.getInstance();
-    private static final ChannelFlowOkMethodHandler      _channelFlowOkMethodHandler      = ChannelFlowOkMethodHandler.getInstance();
-    private static final ConnectionCloseMethodHandler    _connectionCloseMethodHandler    = ConnectionCloseMethodHandler.getInstance();
-    private static final ConnectionOpenOkMethodHandler   _connectionOpenOkMethodHandler   = ConnectionOpenOkMethodHandler.getInstance();
+    private static final BasicCancelOkMethodHandler _basicCancelOkMethodHandler = BasicCancelOkMethodHandler.getInstance();
+    private static final BasicDeliverMethodHandler _basicDeliverMethodHandler = BasicDeliverMethodHandler.getInstance();
+    private static final BasicReturnMethodHandler _basicReturnMethodHandler = BasicReturnMethodHandler.getInstance();
+    private static final ChannelCloseMethodHandler _channelCloseMethodHandler = ChannelCloseMethodHandler.getInstance();
+    private static final ChannelFlowOkMethodHandler _channelFlowOkMethodHandler = ChannelFlowOkMethodHandler.getInstance();
+    private static final ConnectionCloseMethodHandler _connectionCloseMethodHandler = ConnectionCloseMethodHandler.getInstance();
+    private static final ConnectionOpenOkMethodHandler _connectionOpenOkMethodHandler = ConnectionOpenOkMethodHandler.getInstance();
     private static final ConnectionRedirectMethodHandler _connectionRedirectMethodHandler = ConnectionRedirectMethodHandler.getInstance();
-    private static final ConnectionSecureMethodHandler   _connectionSecureMethodHandler   = ConnectionSecureMethodHandler.getInstance();
-    private static final ConnectionStartMethodHandler    _connectionStartMethodHandler    = ConnectionStartMethodHandler.getInstance();
-    private static final ConnectionTuneMethodHandler     _connectionTuneMethodHandler     = ConnectionTuneMethodHandler.getInstance();
-    private static final ExchangeBoundOkMethodHandler    _exchangeBoundOkMethodHandler    = ExchangeBoundOkMethodHandler.getInstance();
-    private static final QueueDeleteOkMethodHandler      _queueDeleteOkMethodHandler      = QueueDeleteOkMethodHandler.getInstance();
-
+    private static final ConnectionSecureMethodHandler _connectionSecureMethodHandler = ConnectionSecureMethodHandler.getInstance();
+    private static final ConnectionStartMethodHandler _connectionStartMethodHandler = ConnectionStartMethodHandler.getInstance();
+    private static final ConnectionTuneMethodHandler _connectionTuneMethodHandler = ConnectionTuneMethodHandler.getInstance();
+    private static final ExchangeBoundOkMethodHandler _exchangeBoundOkMethodHandler = ExchangeBoundOkMethodHandler.getInstance();
+    private static final QueueDeleteOkMethodHandler _queueDeleteOkMethodHandler = QueueDeleteOkMethodHandler.getInstance();
 
+    private static final Logger _logger = LoggerFactory.getLogger(ClientMethodDispatcherImpl.class);
 
     private static interface DispatcherFactory
     {
-        public ClientMethodDispatcherImpl createMethodDispatcher(AMQStateManager stateManager);
+        public ClientMethodDispatcherImpl createMethodDispatcher(AMQProtocolSession session);
     }
 
     private static final Map<ProtocolVersion, DispatcherFactory> _dispatcherFactories =
@@ -62,44 +63,44 @@
         _dispatcherFactories.put(ProtocolVersion.v8_0,
                                  new DispatcherFactory()
                                  {
-                                     public ClientMethodDispatcherImpl createMethodDispatcher(AMQStateManager stateManager)
+                                     public ClientMethodDispatcherImpl createMethodDispatcher(AMQProtocolSession session)
                                      {
-                                         return new ClientMethodDispatcherImpl_8_0(stateManager);
+                                         return new ClientMethodDispatcherImpl_8_0(session);
                                      }
                                  });
 
         _dispatcherFactories.put(ProtocolVersion.v0_9,
                                  new DispatcherFactory()
                                  {
-                                     public ClientMethodDispatcherImpl createMethodDispatcher(AMQStateManager stateManager)
+                                     public ClientMethodDispatcherImpl createMethodDispatcher(AMQProtocolSession session)
                                      {
-                                         return new ClientMethodDispatcherImpl_0_9(stateManager);
+                                         return new ClientMethodDispatcherImpl_0_9(session);
                                      }
                                  });
 
     }
 
-
-    public static ClientMethodDispatcherImpl newMethodDispatcher(ProtocolVersion version, AMQStateManager stateManager)
+    public static ClientMethodDispatcherImpl newMethodDispatcher(ProtocolVersion version, AMQProtocolSession session)
     {
+        if (_logger.isInfoEnabled())
+        {
+            _logger.info("New Method Dispatcher:" + session);
+        }
+        
         DispatcherFactory factory = _dispatcherFactories.get(version);
-        return factory.createMethodDispatcher(stateManager);
+        return factory.createMethodDispatcher(session);
     }
-    
 
+    AMQProtocolSession _session;
 
-
-    private AMQStateManager _stateManager;
-
-    public ClientMethodDispatcherImpl(AMQStateManager stateManager)
+    public ClientMethodDispatcherImpl(AMQProtocolSession session)
     {
-        _stateManager = stateManager;
+        _session = session;
     }
 
-
     public AMQStateManager getStateManager()
     {
-        return _stateManager;
+        return _session.getStateManager();
     }
 
     public boolean dispatchAccessRequestOk(AccessRequestOkBody body, int channelId) throws AMQException
@@ -109,7 +110,7 @@
 
     public boolean dispatchBasicCancelOk(BasicCancelOkBody body, int channelId) throws AMQException
     {
-        _basicCancelOkMethodHandler.methodReceived(_stateManager,body,channelId);
+        _basicCancelOkMethodHandler.methodReceived(_session, body, channelId);
         return true;
     }
 
@@ -120,7 +121,7 @@
 
     public boolean dispatchBasicDeliver(BasicDeliverBody body, int channelId) throws AMQException
     {
-        _basicDeliverMethodHandler.methodReceived(_stateManager,body,channelId);
+        _basicDeliverMethodHandler.methodReceived(_session, body, channelId);
         return true;
     }
 
@@ -141,13 +142,13 @@
 
     public boolean dispatchBasicReturn(BasicReturnBody body, int channelId) throws AMQException
     {
-        _basicReturnMethodHandler.methodReceived(_stateManager,body,channelId);
+        _basicReturnMethodHandler.methodReceived(_session, body, channelId);
         return true;
     }
 
     public boolean dispatchChannelClose(ChannelCloseBody body, int channelId) throws AMQException
     {
-        _channelCloseMethodHandler.methodReceived(_stateManager,body,channelId);
+        _channelCloseMethodHandler.methodReceived(_session, body, channelId);
         return true;
     }
 
@@ -163,7 +164,7 @@
 
     public boolean dispatchChannelFlowOk(ChannelFlowOkBody body, int channelId) throws AMQException
     {
-        _channelFlowOkMethodHandler.methodReceived(_stateManager,body,channelId);
+        _channelFlowOkMethodHandler.methodReceived(_session, body, channelId);
         return true;
     }
 
@@ -174,7 +175,7 @@
 
     public boolean dispatchConnectionClose(ConnectionCloseBody body, int channelId) throws AMQException
     {
-        _connectionCloseMethodHandler.methodReceived(_stateManager,body,channelId);
+        _connectionCloseMethodHandler.methodReceived(_session, body, channelId);
         return true;
     }
 
@@ -185,37 +186,37 @@
 
     public boolean dispatchConnectionOpenOk(ConnectionOpenOkBody body, int channelId) throws AMQException
     {
-        _connectionOpenOkMethodHandler.methodReceived(_stateManager,body,channelId);
+        _connectionOpenOkMethodHandler.methodReceived(_session, body, channelId);
         return true;
     }
 
     public boolean dispatchConnectionRedirect(ConnectionRedirectBody body, int channelId) throws AMQException
     {
-        _connectionRedirectMethodHandler.methodReceived(_stateManager,body,channelId);
+        _connectionRedirectMethodHandler.methodReceived(_session, body, channelId);
         return true;
     }
 
     public boolean dispatchConnectionSecure(ConnectionSecureBody body, int channelId) throws AMQException
     {
-        _connectionSecureMethodHandler.methodReceived(_stateManager,body,channelId);
+        _connectionSecureMethodHandler.methodReceived(_session, body, channelId);
         return true;
     }
 
     public boolean dispatchConnectionStart(ConnectionStartBody body, int channelId) throws AMQException
     {
-        _connectionStartMethodHandler.methodReceived(_stateManager,body,channelId);
+        _connectionStartMethodHandler.methodReceived(_session, body, channelId);
         return true;
     }
 
     public boolean dispatchConnectionTune(ConnectionTuneBody body, int channelId) throws AMQException
     {
-        _connectionTuneMethodHandler.methodReceived(_stateManager,body,channelId);
+        _connectionTuneMethodHandler.methodReceived(_session, body, channelId);
         return true;
     }
 
     public boolean dispatchQueueDeleteOk(QueueDeleteOkBody body, int channelId) throws AMQException
     {
-        _queueDeleteOkMethodHandler.methodReceived(_stateManager,body,channelId);
+        _queueDeleteOkMethodHandler.methodReceived(_session, body, channelId);
         return true;
     }
 
@@ -431,7 +432,7 @@
 
     public boolean dispatchExchangeBoundOk(ExchangeBoundOkBody body, int channelId) throws AMQException
     {
-        _exchangeBoundOkMethodHandler.methodReceived(_stateManager,body,channelId);
+        _exchangeBoundOkMethodHandler.methodReceived(_session, body, channelId);
         return true;
     }
 
@@ -522,7 +523,7 @@
 
     public boolean dispatchTxSelectOk(TxSelectOkBody body, int channelId) throws AMQException
     {
-        return false;  
+        return false;
     }
 
 }

Modified: incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl_0_9.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl_0_9.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl_0_9.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl_0_9.java Thu Aug 14 20:40:49 2008
@@ -26,16 +26,15 @@
 import org.apache.qpid.AMQException;
 import org.apache.qpid.client.state.AMQStateManager;
 import org.apache.qpid.client.state.AMQMethodNotImplementedException;
-
+import org.apache.qpid.client.protocol.AMQProtocolSession;
 
 public class ClientMethodDispatcherImpl_0_9 extends ClientMethodDispatcherImpl implements MethodDispatcher_0_9
 {
-    public ClientMethodDispatcherImpl_0_9(AMQStateManager stateManager)
+    public ClientMethodDispatcherImpl_0_9(AMQProtocolSession session)
     {
-        super(stateManager);
+        super(session);
     }
 
-
     public boolean dispatchBasicRecoverSyncOk(BasicRecoverSyncOkBody body, int channelId) throws AMQException
     {
         return false;
@@ -148,8 +147,7 @@
 
     public boolean dispatchQueueUnbindOk(QueueUnbindOkBody body, int channelId) throws AMQException
     {
-        return false;  
+        return false;
     }
 
-
 }

Modified: incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl_8_0.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl_8_0.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl_8_0.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl_8_0.java Thu Aug 14 20:40:49 2008
@@ -24,13 +24,13 @@
 import org.apache.qpid.framing.amqp_8_0.MethodDispatcher_8_0;
 
 import org.apache.qpid.AMQException;
-import org.apache.qpid.client.state.AMQStateManager;
+import org.apache.qpid.client.protocol.AMQProtocolSession;
 
 public class ClientMethodDispatcherImpl_8_0 extends ClientMethodDispatcherImpl implements MethodDispatcher_8_0
 {
-    public ClientMethodDispatcherImpl_8_0(AMQStateManager stateManager)
+    public ClientMethodDispatcherImpl_8_0(AMQProtocolSession session)
     {
-        super(stateManager);
+        super(session);
     }
 
     public boolean dispatchBasicRecoverOk(BasicRecoverOkBody body, int channelId) throws AMQException

Modified: incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java Thu Aug 14 20:40:49 2008
@@ -25,13 +25,11 @@
 import org.apache.qpid.client.AMQAuthenticationException;
 import org.apache.qpid.client.protocol.AMQProtocolSession;
 import org.apache.qpid.client.state.AMQState;
-import org.apache.qpid.client.state.AMQStateManager;
 import org.apache.qpid.client.state.StateAwareMethodListener;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.ConnectionCloseBody;
 import org.apache.qpid.framing.ConnectionCloseOkBody;
 import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.protocol.AMQMethodEvent;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -48,14 +46,13 @@
     }
 
     private ConnectionCloseMethodHandler()
-    { }
+    {
+    }
 
-    public void methodReceived(AMQStateManager stateManager, ConnectionCloseBody method, int channelId)
-                throws AMQException
+    public void methodReceived(AMQProtocolSession session, ConnectionCloseBody method, int channelId)
+            throws AMQException
     {
         _logger.info("ConnectionClose frame received");
-        final AMQProtocolSession session = stateManager.getProtocolSession();
-        
 
         // does it matter
         // stateManager.changeState(AMQState.CONNECTION_CLOSING);
@@ -63,6 +60,8 @@
         AMQConstant errorCode = AMQConstant.getConstant(method.getReplyCode());
         AMQShortString reason = method.getReplyText();
 
+        AMQException error = null;
+
         try
         {
 
@@ -75,35 +74,33 @@
             {
                 if (errorCode == AMQConstant.NOT_ALLOWED || (errorCode == AMQConstant.ACCESS_REFUSED))
                 {
-                    _logger.info("Error :" + errorCode +":"+ Thread.currentThread().getName());
-
-                    // todo ritchiem : Why do this here when it is going to be done in the finally block?
-                    session.closeProtocolSession();
+                    _logger.info("Error :" + errorCode + ":" + Thread.currentThread().getName());
 
-                    // todo this is a bit of a fudge (could be conssidered such as each new connection needs a new state manager or at least a fresh state.
-                    stateManager.changeState(AMQState.CONNECTION_NOT_STARTED);
-
-                    throw new AMQAuthenticationException(errorCode, reason == null ? null : reason.toString(), null);
+                    error = new AMQAuthenticationException(errorCode, reason == null ? null : reason.toString(), null);
                 }
                 else
                 {
                     _logger.info("Connection close received with error code " + errorCode);
 
-                    throw new AMQConnectionClosedException(errorCode, "Error: " + reason, null);
+                    error = new AMQConnectionClosedException(errorCode, "Error: " + reason, null);
                 }
             }
         }
         finally
         {
-            // this actually closes the connection in the case where it is not an error.
 
+            if (error != null)
+            {
+                session.notifyError(error);
+            }            
+
+            // Close the protocol Session, including any open TCP connections 
             session.closeProtocolSession();
 
-            // ritchiem: Doing this though will cause any waiting connection start to be released without being able to
-            // see what the cause was.
-            stateManager.changeState(AMQState.CONNECTION_CLOSED);
+            // Closing the session should not introduce a race condition as this thread will continue to propgate any
+            // exception in to the exceptionCaught method of the SessionHandler.
+            // Any sessionClosed event should occur after this.
         }
     }
 
-
 }

Modified: incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionOpenOkMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionOpenOkMethodHandler.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionOpenOkMethodHandler.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionOpenOkMethodHandler.java Thu Aug 14 20:40:49 2008
@@ -24,9 +24,7 @@
 import org.apache.qpid.framing.ConnectionOpenOkBody;
 import org.apache.qpid.client.protocol.AMQProtocolSession;
 import org.apache.qpid.client.state.AMQState;
-import org.apache.qpid.client.state.AMQStateManager;
 import org.apache.qpid.client.state.StateAwareMethodListener;
-import org.apache.qpid.protocol.AMQMethodEvent;
 
 public class ConnectionOpenOkMethodHandler implements StateAwareMethodListener<ConnectionOpenOkBody>
 {
@@ -41,10 +39,10 @@
     {
     }
 
-    public void methodReceived(AMQStateManager stateManager, ConnectionOpenOkBody body, int channelId)
+    public void methodReceived(AMQProtocolSession session, ConnectionOpenOkBody body, int channelId)
                 throws AMQException            
     {
-        stateManager.changeState(AMQState.CONNECTION_OPEN);
+        session.getStateManager().changeState(AMQState.CONNECTION_OPEN);
     }
 
 

Modified: incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionRedirectMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionRedirectMethodHandler.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionRedirectMethodHandler.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionRedirectMethodHandler.java Thu Aug 14 20:40:49 2008
@@ -22,10 +22,8 @@
 
 import org.apache.qpid.AMQException;
 import org.apache.qpid.client.protocol.AMQProtocolSession;
-import org.apache.qpid.client.state.AMQStateManager;
 import org.apache.qpid.client.state.StateAwareMethodListener;
 import org.apache.qpid.framing.ConnectionRedirectBody;
-import org.apache.qpid.protocol.AMQMethodEvent;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -46,11 +44,10 @@
     private ConnectionRedirectMethodHandler()
     { }
 
-    public void methodReceived(AMQStateManager stateManager, ConnectionRedirectBody method, int channelId)
+    public void methodReceived(AMQProtocolSession session, ConnectionRedirectBody method, int channelId)
             throws AMQException
     {
         _logger.info("ConnectionRedirect frame received");
-        final AMQProtocolSession session = stateManager.getProtocolSession();         
 
         String host = method.getHost().toString();
         // the host is in the form hostname:port with the port being optional

Modified: incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionSecureMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionSecureMethodHandler.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionSecureMethodHandler.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionSecureMethodHandler.java Thu Aug 14 20:40:49 2008
@@ -25,12 +25,9 @@
 
 import org.apache.qpid.AMQException;
 import org.apache.qpid.client.protocol.AMQProtocolSession;
-import org.apache.qpid.client.state.AMQStateManager;
 import org.apache.qpid.client.state.StateAwareMethodListener;
-import org.apache.qpid.framing.AMQFrame;
 import org.apache.qpid.framing.ConnectionSecureBody;
 import org.apache.qpid.framing.ConnectionSecureOkBody;
-import org.apache.qpid.protocol.AMQMethodEvent;
 
 public class ConnectionSecureMethodHandler implements StateAwareMethodListener<ConnectionSecureBody>
 {
@@ -41,10 +38,9 @@
         return _instance;
     }
 
-    public void methodReceived(AMQStateManager stateManager, ConnectionSecureBody body, int channelId)
+    public void methodReceived(AMQProtocolSession session, ConnectionSecureBody body, int channelId)
                 throws AMQException
     {
-        final AMQProtocolSession session = stateManager.getProtocolSession(); 
         SaslClient client = session.getSaslClient();
         if (client == null)
         {

Modified: incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java Thu Aug 14 20:40:49 2008
@@ -25,7 +25,6 @@
 import org.apache.qpid.client.security.AMQCallbackHandler;
 import org.apache.qpid.client.security.CallbackHandlerRegistry;
 import org.apache.qpid.client.state.AMQState;
-import org.apache.qpid.client.state.AMQStateManager;
 import org.apache.qpid.client.state.StateAwareMethodListener;
 import org.apache.qpid.common.ClientProperties;
 import org.apache.qpid.common.QpidProperties;
@@ -35,7 +34,6 @@
 import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.framing.FieldTableFactory;
 import org.apache.qpid.framing.ProtocolVersion;
-import org.apache.qpid.protocol.AMQMethodEvent;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -62,15 +60,12 @@
     private ConnectionStartMethodHandler()
     { }
 
-    public void methodReceived(AMQStateManager stateManager, ConnectionStartBody body, int channelId)
+    public void methodReceived(AMQProtocolSession session, ConnectionStartBody body, int channelId)
             throws AMQException
     {
         _log.debug("public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, "
             + "AMQMethodEvent evt): called");
 
-        final AMQProtocolSession session = stateManager.getProtocolSession();
-
-
         ProtocolVersion pv = new ProtocolVersion((byte) body.getVersionMajor(), (byte) body.getVersionMinor());
 
         // For the purposes of interop, we can make the client accept the broker's version string.
@@ -145,7 +140,7 @@
                     throw new AMQException(null, "No locales sent from server, passed: " + locales, null);
                 }
 
-                stateManager.changeState(AMQState.CONNECTION_NOT_TUNED);
+                session.getStateManager().changeState(AMQState.CONNECTION_NOT_TUNED);
                 FieldTable clientProperties = FieldTableFactory.newFieldTable();
 
                 clientProperties.setString(new AMQShortString(ClientProperties.instance.toString()),

Modified: incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java Thu Aug 14 20:40:49 2008
@@ -24,10 +24,8 @@
 import org.apache.qpid.client.ConnectionTuneParameters;
 import org.apache.qpid.client.protocol.AMQProtocolSession;
 import org.apache.qpid.client.state.AMQState;
-import org.apache.qpid.client.state.AMQStateManager;
 import org.apache.qpid.client.state.StateAwareMethodListener;
 import org.apache.qpid.framing.*;
-import org.apache.qpid.protocol.AMQMethodEvent;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -46,11 +44,10 @@
     protected ConnectionTuneMethodHandler()
     { }
 
-    public void methodReceived(AMQStateManager stateManager, ConnectionTuneBody frame, int channelId)
+    public void methodReceived(AMQProtocolSession session, ConnectionTuneBody frame, int channelId)
                 throws AMQException
     {
         _logger.debug("ConnectionTune frame received");
-        final AMQProtocolSession session = stateManager.getProtocolSession();
         final MethodRegistry methodRegistry = session.getMethodRegistry();
 
 
@@ -65,7 +62,7 @@
         params.setHeartbeat(Integer.getInteger("amqj.heartbeat.delay", frame.getHeartbeat()));
         session.setConnectionTuneParameters(params);
 
-        stateManager.changeState(AMQState.CONNECTION_NOT_OPENED);
+        session.getStateManager().changeState(AMQState.CONNECTION_NOT_OPENED);
 
         ConnectionTuneOkBody tuneOkBody = methodRegistry.createConnectionTuneOkBody(params.getChannelMax(),
                                                                                     params.getFrameMax(),

Modified: incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/handler/ExchangeBoundOkMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/handler/ExchangeBoundOkMethodHandler.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/handler/ExchangeBoundOkMethodHandler.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/handler/ExchangeBoundOkMethodHandler.java Thu Aug 14 20:40:49 2008
@@ -22,10 +22,8 @@
 
 import org.apache.qpid.AMQException;
 import org.apache.qpid.client.protocol.AMQProtocolSession;
-import org.apache.qpid.client.state.AMQStateManager;
 import org.apache.qpid.client.state.StateAwareMethodListener;
 import org.apache.qpid.framing.ExchangeBoundOkBody;
-import org.apache.qpid.protocol.AMQMethodEvent;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -46,7 +44,7 @@
     private ExchangeBoundOkMethodHandler()
     { }
 
-    public void methodReceived(AMQStateManager stateManager, ExchangeBoundOkBody body, int channelId)
+    public void methodReceived(AMQProtocolSession session, ExchangeBoundOkBody body, int channelId)
             throws AMQException
     {
         if (_logger.isDebugEnabled())

Modified: incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/handler/QueueDeleteOkMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/handler/QueueDeleteOkMethodHandler.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/handler/QueueDeleteOkMethodHandler.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/handler/QueueDeleteOkMethodHandler.java Thu Aug 14 20:40:49 2008
@@ -22,10 +22,8 @@
 
 import org.apache.qpid.AMQException;
 import org.apache.qpid.client.protocol.AMQProtocolSession;
-import org.apache.qpid.client.state.AMQStateManager;
 import org.apache.qpid.client.state.StateAwareMethodListener;
 import org.apache.qpid.framing.QueueDeleteOkBody;
-import org.apache.qpid.protocol.AMQMethodEvent;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -46,7 +44,7 @@
     private QueueDeleteOkMethodHandler()
     { }
 
-    public void methodReceived(AMQStateManager stateManager, QueueDeleteOkBody body, int channelId)
+    public void methodReceived(AMQProtocolSession session, QueueDeleteOkBody body, int channelId)
             throws AMQException
     {        
         if (_logger.isDebugEnabled())

Modified: incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesMessage.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesMessage.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesMessage.java Thu Aug 14 20:40:49 2008
@@ -31,7 +31,6 @@
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.BasicContentHeaderProperties;
-import org.apache.qpid.framing.ContentHeaderBody;
 
 /**
  * @author Apache Software Foundation
@@ -44,21 +43,21 @@
      */
     private static final int DEFAULT_BUFFER_INITIAL_SIZE = 1024;
 
-    AbstractBytesMessage()
+    AbstractBytesMessage(AMQMessageDelegateFactory delegateFactory)
     {
-        this(null);
+        this(delegateFactory, null);
     }
 
     /**
      * Construct a bytes message with existing data.
      *
+     * @param delegateFactory
      * @param data the data that comprises this message. If data is null, you get a 1024 byte buffer that is
-     *             set to auto expand
      */
-    AbstractBytesMessage(ByteBuffer data)
+    AbstractBytesMessage(AMQMessageDelegateFactory delegateFactory, ByteBuffer data)
     {
-        super(data); // this instanties a content header
-        getContentHeaderProperties().setContentType(getMimeTypeAsShortString());
+        super(delegateFactory, data); // this instanties a content header
+        setContentType(getMimeType());
 
         if (_data == null)
         {
@@ -72,13 +71,12 @@
         _data.setAutoExpand(true);
     }
 
-    AbstractBytesMessage(long messageNbr, BasicContentHeaderProperties contentHeader, AMQShortString exchange,
-        AMQShortString routingKey, ByteBuffer data) throws AMQException
-    {
-        // TODO: this casting is ugly. Need to review whole ContentHeaderBody idea
-        super(messageNbr, contentHeader, exchange, routingKey, data);
-        getContentHeaderProperties().setContentType(getMimeTypeAsShortString());
-    }
+    AbstractBytesMessage(AMQMessageDelegate delegate, ByteBuffer data) throws AMQException
+     {
+         super(delegate, data);
+         setContentType(getMimeType());
+     }
+
 
     public void clearBodyImpl() throws JMSException
     {

Modified: incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesTypedMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesTypedMessage.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesTypedMessage.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesTypedMessage.java Thu Aug 14 20:40:49 2008
@@ -33,7 +33,6 @@
 import org.apache.mina.common.ByteBuffer;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.ContentHeaderBody;
 import org.apache.qpid.framing.BasicContentHeaderProperties;
 
 /**
@@ -70,27 +69,28 @@
      */
     private int _byteArrayRemaining = -1;
 
-    AbstractBytesTypedMessage()
+    AbstractBytesTypedMessage(AMQMessageDelegateFactory delegateFactory)
     {
-        this(null);
+
+        this(delegateFactory, null);
     }
 
     /**
      * Construct a stream message with existing data.
      *
+     * @param delegateFactory
      * @param data the data that comprises this message. If data is null, you get a 1024 byte buffer that is
-     *             set to auto expand
      */
-    AbstractBytesTypedMessage(ByteBuffer data)
+    AbstractBytesTypedMessage(AMQMessageDelegateFactory delegateFactory, ByteBuffer data)
     {
-        super(data); // this instanties a content header
-    }
 
+        super(delegateFactory, data); // this instanties a content header
+    }
 
-    AbstractBytesTypedMessage(long messageNbr, BasicContentHeaderProperties contentHeader, AMQShortString exchange,
-                              AMQShortString routingKey, ByteBuffer data) throws AMQException
+    AbstractBytesTypedMessage(AMQMessageDelegate delegate, ByteBuffer data) throws AMQException
     {
-        super(messageNbr, contentHeader, exchange, routingKey, data);
+
+        super(delegate, data);
     }
 
 

Modified: incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java Thu Aug 14 20:40:49 2008
@@ -21,10 +21,7 @@
 package org.apache.qpid.client.message;
 
 import java.io.IOException;
-import java.net.URISyntaxException;
-import java.util.Collections;
 import java.util.Enumeration;
-import java.util.Map;
 import java.util.UUID;
 
 import javax.jms.Destination;
@@ -32,122 +29,48 @@
 import javax.jms.MessageNotReadableException;
 import javax.jms.MessageNotWriteableException;
 
-import org.apache.commons.collections.map.ReferenceMap;
 import org.apache.mina.common.ByteBuffer;
 import org.apache.qpid.AMQException;
-import org.apache.qpid.client.AMQDestination;
-import org.apache.qpid.client.AMQQueue;
 import org.apache.qpid.client.AMQSession;
-import org.apache.qpid.client.AMQTopic;
-import org.apache.qpid.client.AMQUndefinedDestination;
-import org.apache.qpid.client.BasicMessageConsumer;
-import org.apache.qpid.client.CustomJMSXProperty;
-import org.apache.qpid.client.JMSAMQException;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.BasicContentHeaderProperties;
-import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.url.AMQBindingURL;
-import org.apache.qpid.url.BindingURL;
 
-public abstract class AbstractJMSMessage extends AMQMessage implements org.apache.qpid.jms.Message
+public abstract class AbstractJMSMessage implements org.apache.qpid.jms.Message
 {
-    private static final Map _destinationCache = Collections.synchronizedMap(new ReferenceMap());
 
-    public static final String JMS_TYPE = "x-jms-type";
 
-    protected boolean _redelivered;
 
     protected ByteBuffer _data;
-    private boolean _readableProperties = false;
     protected boolean _readableMessage = false;
     protected boolean _changedData = true;
-    private Destination _destination;
-    private JMSHeaderAdapter _headerAdapter;
-    private static final boolean STRICT_AMQP_COMPLIANCE =
-            Boolean.parseBoolean(System.getProperties().getProperty(AMQSession.STRICT_AMQP, AMQSession.STRICT_AMQP_DEFAULT));
 
-    /**
-     * This is 0_10 specific
-     */
-    private org.apache.qpidity.api.Message _010message = null;
+    /** If the acknowledge mode is CLIENT_ACKNOWLEDGE the session is required */
 
-    public void set010Message(org.apache.qpidity.api.Message m )
-    {
-        _010message = m;
-    }
-
-    public void dataChanged()
-    {
-        if (_010message != null)
-        {
-            _010message.clearData();
-            try
-            {
-                if (_data != null)
-                {
-                    _010message.appendData(_data.buf().slice());
-                }
-                else
-                {
-                    _010message.appendData(java.nio.ByteBuffer.allocate(0));
-                }
-            }
-            catch (IOException e)
-            {
-                throw new RuntimeException(e);
-            }
-        }
-    }
 
-    /**
-     * End 010 specific
-     */
 
-    public org.apache.qpidity.api.Message get010Message()
-    {
-        return _010message;
-    }
 
+    private AMQMessageDelegate _delegate;
+    private boolean _redelivered;
 
-    protected AbstractJMSMessage(ByteBuffer data)
+    protected AbstractJMSMessage(AMQMessageDelegateFactory delegateFactory, ByteBuffer data)
     {
-        super(new BasicContentHeaderProperties());
+        _delegate = delegateFactory.createDelegate();
         _data = data;
         if (_data != null)
         {
             _data.acquire();
         }
 
-        _readableProperties = false;
+
         _readableMessage = (data != null);
         _changedData = (data == null);
-        _headerAdapter = new JMSHeaderAdapter(((BasicContentHeaderProperties) _contentHeaderProperties).getHeaders());
 
     }
 
-    protected AbstractJMSMessage(long deliveryTag, BasicContentHeaderProperties contentHeader, AMQShortString exchange,
-        AMQShortString routingKey, ByteBuffer data) throws AMQException
+    protected AbstractJMSMessage(AMQMessageDelegate delegate, ByteBuffer data) throws AMQException
     {
-        this(contentHeader, deliveryTag);
-
-        Integer type = contentHeader.getHeaders().getInteger(CustomJMSXProperty.JMS_QPID_DESTTYPE.getShortStringName());
 
-        AMQDestination dest;
-
-        if (AMQDestination.QUEUE_TYPE.equals(type))
-        {
-            dest = new AMQQueue(exchange, routingKey, routingKey);
-        }
-        else if (AMQDestination.TOPIC_TYPE.equals(type))
-        {
-            dest = new AMQTopic(exchange, routingKey, null);
-        }
-        else
-        {
-            dest = new AMQUndefinedDestination(exchange, routingKey, null);
-        }
-        // Destination dest = AMQDestination.createDestination(url);
-        setJMSDestination(dest);
+        _delegate = delegate;
 
         _data = data;
         if (_data != null)
@@ -159,126 +82,82 @@
 
     }
 
-    protected AbstractJMSMessage(BasicContentHeaderProperties contentHeader, long deliveryTag)
+    public String getJMSMessageID() throws JMSException
     {
-        super(contentHeader, deliveryTag);
-        _readableProperties = (_contentHeaderProperties != null);
-        _headerAdapter = new JMSHeaderAdapter(((BasicContentHeaderProperties) _contentHeaderProperties).getHeaders());
+        return _delegate.getJMSMessageID();
     }
 
-    public String getJMSMessageID() throws JMSException
+    public void setJMSMessageID(String messageId) throws JMSException
     {
-        return getContentHeaderProperties().getMessageIdAsString();
+        _delegate.setJMSMessageID(messageId);
     }
 
-    public void setJMSMessageID(String messageId) throws JMSException
+    public void setJMSMessageID(UUID messageId) throws JMSException
     {
-        getContentHeaderProperties().setMessageId(messageId);
+        _delegate.setJMSMessageID(messageId);
     }
 
+
     public long getJMSTimestamp() throws JMSException
     {
-        return getContentHeaderProperties().getTimestamp();
+        return _delegate.getJMSTimestamp();
     }
 
     public void setJMSTimestamp(long timestamp) throws JMSException
     {
-        getContentHeaderProperties().setTimestamp(timestamp);
+        _delegate.setJMSTimestamp(timestamp);
     }
 
     public byte[] getJMSCorrelationIDAsBytes() throws JMSException
     {
-        return getContentHeaderProperties().getCorrelationIdAsString().getBytes();
+        return _delegate.getJMSCorrelationIDAsBytes();
     }
 
     public void setJMSCorrelationIDAsBytes(byte[] bytes) throws JMSException
     {
-        getContentHeaderProperties().setCorrelationId(new String(bytes));
+        _delegate.setJMSCorrelationIDAsBytes(bytes);
     }
 
     public void setJMSCorrelationID(String correlationId) throws JMSException
     {
-        getContentHeaderProperties().setCorrelationId(correlationId);
+        _delegate.setJMSCorrelationID(correlationId);
     }
 
     public String getJMSCorrelationID() throws JMSException
     {
-        return getContentHeaderProperties().getCorrelationIdAsString();
+        return _delegate.getJMSCorrelationID();
     }
 
     public Destination getJMSReplyTo() throws JMSException
     {
-        String replyToEncoding = getContentHeaderProperties().getReplyToAsString();
-        if (replyToEncoding == null)
-        {
-            return null;
-        }
-        else
-        {
-            Destination dest = (Destination) _destinationCache.get(replyToEncoding);
-            if (dest == null)
-            {
-                try
-                {
-                    BindingURL binding = new AMQBindingURL(replyToEncoding);
-                    dest = AMQDestination.createDestination(binding);
-                }
-                catch (URISyntaxException e)
-                {
-                    throw new JMSAMQException("Illegal value in JMS_ReplyTo property: " + replyToEncoding, e);
-                }
-
-                _destinationCache.put(replyToEncoding, dest);
-            }
-
-            return dest;
-        }
+        return _delegate.getJMSReplyTo();
     }
 
     public void setJMSReplyTo(Destination destination) throws JMSException
     {
-        if (destination == null)
-        {
-            throw new IllegalArgumentException("Null destination not allowed");
-        }
-
-        if (!(destination instanceof AMQDestination))
-        {
-            throw new IllegalArgumentException(
-                "ReplyTo destination may only be an AMQDestination - passed argument was type " + destination.getClass());
-        }
-
-        final AMQDestination amqd = (AMQDestination) destination;
-
-        final AMQShortString encodedDestination = amqd.getEncodedName();
-        _destinationCache.put(encodedDestination, destination);
-        getContentHeaderProperties().setReplyTo(encodedDestination);
+        _delegate.setJMSReplyTo(destination);
     }
 
     public Destination getJMSDestination() throws JMSException
     {
-        return _destination;
+        return _delegate.getJMSDestination();
     }
 
     public void setJMSDestination(Destination destination)
     {
-        _destination = destination;
+        _delegate.setJMSDestination(destination);
     }
 
     public int getJMSDeliveryMode() throws JMSException
     {
-        return getContentHeaderProperties().getDeliveryMode();
+        return _delegate.getJMSDeliveryMode();
     }
 
     public void setJMSDeliveryMode(int i) throws JMSException
     {
-        getContentHeaderProperties().setDeliveryMode((byte) i);
+        _delegate.setJMSDeliveryMode(i);
     }
 
-    public BasicContentHeaderProperties getContentHeaderProperties()
-    {
-        return (BasicContentHeaderProperties) _contentHeaderProperties;
-    }
 
     public boolean getJMSRedelivered() throws JMSException
     {
@@ -290,318 +169,180 @@
         _redelivered = b;
     }
 
+
     public String getJMSType() throws JMSException
     {
-        return getContentHeaderProperties().getTypeAsString();
+        return _delegate.getJMSType();
     }
 
     public void setJMSType(String string) throws JMSException
     {
-        getContentHeaderProperties().setType(string);
+        _delegate.setJMSType(string);
     }
 
     public long getJMSExpiration() throws JMSException
     {
-        return getContentHeaderProperties().getExpiration();
+        return _delegate.getJMSExpiration();
     }
 
     public void setJMSExpiration(long l) throws JMSException
     {
-        getContentHeaderProperties().setExpiration(l);
+        _delegate.setJMSExpiration(l);
     }
 
     public int getJMSPriority() throws JMSException
     {
-        return getContentHeaderProperties().getPriority();
+        return _delegate.getJMSPriority();
     }
 
     public void setJMSPriority(int i) throws JMSException
     {
-        getContentHeaderProperties().setPriority((byte) i);
-    }
-
-    public void clearProperties() throws JMSException
-    {
-        getJmsHeaders().clear();
-
-        _readableProperties = false;
-    }
-
-    public void clearBody() throws JMSException
-    {
-        clearBodyImpl();
-        _readableMessage = false;
+        _delegate.setJMSPriority(i);
     }
 
-    public boolean propertyExists(AMQShortString propertyName) throws JMSException
-    {
-        return getJmsHeaders().propertyExists(propertyName);
-    }
 
     public boolean propertyExists(String propertyName) throws JMSException
     {
-        return getJmsHeaders().propertyExists(propertyName);
+        return _delegate.propertyExists(propertyName);
     }
 
-    public boolean getBooleanProperty(AMQShortString propertyName) throws JMSException
+    public boolean getBooleanProperty(final String s)
+            throws JMSException
     {
-        if (STRICT_AMQP_COMPLIANCE)
-        {
-            throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
-        }
-
-        return getJmsHeaders().getBoolean(propertyName);
+        return _delegate.getBooleanProperty(s);
     }
 
-    public boolean getBooleanProperty(String propertyName) throws JMSException
+    public byte getByteProperty(final String s)
+            throws JMSException
     {
-        if (STRICT_AMQP_COMPLIANCE)
-        {
-            throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
-        }
-
-        return getJmsHeaders().getBoolean(propertyName);
+        return _delegate.getByteProperty(s);
     }
 
-    public byte getByteProperty(String propertyName) throws JMSException
+    public short getShortProperty(final String s)
+            throws JMSException
     {
-        if (STRICT_AMQP_COMPLIANCE)
-        {
-            throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
-        }
-
-        return getJmsHeaders().getByte(propertyName);
+        return _delegate.getShortProperty(s);
     }
 
-    public byte[] getBytesProperty(AMQShortString propertyName) throws JMSException
+    public int getIntProperty(final String s)
+            throws JMSException
     {
-        if (STRICT_AMQP_COMPLIANCE)
-        {
-            throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
-        }
-
-        return getJmsHeaders().getBytes(propertyName);
+        return _delegate.getIntProperty(s);
     }
 
-    public short getShortProperty(String propertyName) throws JMSException
+    public long getLongProperty(final String s)
+            throws JMSException
     {
-        if (STRICT_AMQP_COMPLIANCE)
-        {
-            throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
-        }
-
-        return getJmsHeaders().getShort(propertyName);
+        return _delegate.getLongProperty(s);
     }
 
-    public int getIntProperty(String propertyName) throws JMSException
+    public float getFloatProperty(final String s)
+            throws JMSException
     {
-        if (STRICT_AMQP_COMPLIANCE)
-        {
-            throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
-        }
-
-        return getJmsHeaders().getInteger(propertyName);
+        return _delegate.getFloatProperty(s);
     }
 
-    public long getLongProperty(String propertyName) throws JMSException
+    public double getDoubleProperty(final String s)
+            throws JMSException
     {
-        if (STRICT_AMQP_COMPLIANCE)
-        {
-            throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
-        }
-
-        return getJmsHeaders().getLong(propertyName);
-    }
-
-    public float getFloatProperty(String propertyName) throws JMSException
-    {
-        if (STRICT_AMQP_COMPLIANCE)
-        {
-            throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
-        }
-
-        return getJmsHeaders().getFloat(propertyName);
+        return _delegate.getDoubleProperty(s);
     }
 
-    public double getDoubleProperty(String propertyName) throws JMSException
+    public String getStringProperty(final String s)
+            throws JMSException
     {
-        if (STRICT_AMQP_COMPLIANCE)
-        {
-            throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
-        }
-
-        return getJmsHeaders().getDouble(propertyName);
+        return _delegate.getStringProperty(s);
     }
 
-    public String getStringProperty(String propertyName) throws JMSException
+    public Object getObjectProperty(final String s)
+            throws JMSException
     {
-        //NOTE: if the JMSX Property is a non AMQP property then we must check _strictAMQP and throw as below.
-        if (propertyName.equals(CustomJMSXProperty.JMSXUserID.toString()))
-        {
-            return ((BasicContentHeaderProperties) _contentHeaderProperties).getUserIdAsString();
-        }
-        else
-        {
-            if (STRICT_AMQP_COMPLIANCE)
-            {
-                throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
-            }
-
-            return getJmsHeaders().getString(propertyName);
-        }
+        return _delegate.getObjectProperty(s);
     }
 
-    public Object getObjectProperty(String propertyName) throws JMSException
+    public Enumeration getPropertyNames()
+            throws JMSException
     {
-        return getJmsHeaders().getObject(propertyName);
+        return _delegate.getPropertyNames();
     }
 
-    public Enumeration getPropertyNames() throws JMSException
+    public void setBooleanProperty(final String s, final boolean b)
+            throws JMSException
     {
-        return getJmsHeaders().getPropertyNames();
+        _delegate.setBooleanProperty(s, b);
     }
 
-    public void setBooleanProperty(AMQShortString propertyName, boolean b) throws JMSException
+    public void setByteProperty(final String s, final byte b)
+            throws JMSException
     {
-        if (STRICT_AMQP_COMPLIANCE)
-        {
-            throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
-        }
-
-        checkWritableProperties();
-        getJmsHeaders().setBoolean(propertyName, b);
+        _delegate.setByteProperty(s, b);
     }
 
-    public void setBooleanProperty(String propertyName, boolean b) throws JMSException
+    public void setShortProperty(final String s, final short i)
+            throws JMSException
     {
-        if (STRICT_AMQP_COMPLIANCE)
-        {
-            throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
-        }
-
-        checkWritableProperties();
-        getJmsHeaders().setBoolean(propertyName, b);
+        _delegate.setShortProperty(s, i);
     }
 
-    public void setByteProperty(String propertyName, byte b) throws JMSException
+    public void setIntProperty(final String s, final int i)
+            throws JMSException
     {
-        if (STRICT_AMQP_COMPLIANCE)
-        {
-            throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
-        }
-
-        checkWritableProperties();
-        getJmsHeaders().setByte(propertyName, new Byte(b));
+        _delegate.setIntProperty(s, i);
     }
 
-    public void setBytesProperty(AMQShortString propertyName, byte[] bytes) throws JMSException
+    public void setLongProperty(final String s, final long l)
+            throws JMSException
     {
-        if (STRICT_AMQP_COMPLIANCE)
-        {
-            throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
-        }
-
-        checkWritableProperties();
-        getJmsHeaders().setBytes(propertyName, bytes);
+        _delegate.setLongProperty(s, l);
     }
 
-    public void setShortProperty(String propertyName, short i) throws JMSException
+    public void setFloatProperty(final String s, final float v)
+            throws JMSException
     {
-        if (STRICT_AMQP_COMPLIANCE)
-        {
-            throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
-        }
-
-        checkWritableProperties();
-        getJmsHeaders().setShort(propertyName, new Short(i));
+        _delegate.setFloatProperty(s, v);
     }
 
-    public void setIntProperty(String propertyName, int i) throws JMSException
+    public void setDoubleProperty(final String s, final double v)
+            throws JMSException
     {
-        checkWritableProperties();
-        JMSHeaderAdapter.checkPropertyName(propertyName);
-        super.setIntProperty(new AMQShortString(propertyName), new Integer(i));
+        _delegate.setDoubleProperty(s, v);
     }
 
-    public void setLongProperty(String propertyName, long l) throws JMSException
+    public void setStringProperty(final String s, final String s1)
+            throws JMSException
     {
-        if (STRICT_AMQP_COMPLIANCE)
-        {
-            throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
-        }
-
-        checkWritableProperties();
-        getJmsHeaders().setLong(propertyName, new Long(l));
+        _delegate.setStringProperty(s, s1);
     }
 
-    public void setFloatProperty(String propertyName, float f) throws JMSException
+    public void setObjectProperty(final String s, final Object o)
+            throws JMSException
     {
-        if (STRICT_AMQP_COMPLIANCE)
-        {
-            throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
-        }
-
-        checkWritableProperties();
-        getJmsHeaders().setFloat(propertyName, new Float(f));
+        _delegate.setObjectProperty(s, o);
     }
 
-    public void setDoubleProperty(String propertyName, double v) throws JMSException
-    {
-        if (STRICT_AMQP_COMPLIANCE)
-        {
-            throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
-        }
 
-        checkWritableProperties();
-        getJmsHeaders().setDouble(propertyName, new Double(v));
-    }
 
-    public void setStringProperty(String propertyName, String value) throws JMSException
+    public void clearProperties() throws JMSException
     {
-        checkWritableProperties();
-        JMSHeaderAdapter.checkPropertyName(propertyName);
-        super.setLongStringProperty(new AMQShortString(propertyName), value);
+        _delegate.clearProperties();
     }
 
-    public void setObjectProperty(String propertyName, Object object) throws JMSException
+    public void clearBody() throws JMSException
     {
-        checkWritableProperties();
-        getJmsHeaders().setObject(propertyName, object);
-    }
+        clearBodyImpl();
+        _readableMessage = false;
 
-    protected void removeProperty(AMQShortString propertyName) throws JMSException
-    {
-        getJmsHeaders().remove(propertyName);
     }
 
-    protected void removeProperty(String propertyName) throws JMSException
-    {
-        getJmsHeaders().remove(propertyName);
-    }
 
     public void acknowledgeThis() throws JMSException
     {
-        // the JMS 1.1 spec says in section 3.6 that calls to acknowledge are ignored when client acknowledge
-        // is not specified. In our case, we only set the session field where client acknowledge mode is specified.
-        if (_session != null)
-        {
-            if (_session.getAMQConnection().isClosed())
-            {
-                throw new javax.jms.IllegalStateException("Connection is already closed");
-            }
-
-            // we set multiple to true here since acknowledgement implies acknowledge of all previous messages
-            // received on the session
-            _session.acknowledgeMessage(_deliveryTag, true);
-        }
+        _delegate.acknowledgeThis();
     }
 
     public void acknowledge() throws JMSException
     {
-        if (_session != null)
-        {
-            _session.acknowledge();
-        }
+        _delegate.acknowledge();
     }
 
     /**
@@ -617,12 +358,9 @@
      */
     public abstract String toBodyString() throws JMSException;
 
-    public String getMimeType()
-    {
-        return getMimeTypeAsShortString().toString();
-    }
+    protected abstract String getMimeType();
+
 
-    public abstract AMQShortString getMimeTypeAsShortString();
 
     public String toString()
     {
@@ -640,16 +378,23 @@
             buf.append("\nJMS Destination: ").append(getJMSDestination());
             buf.append("\nJMS Type: ").append(getJMSType());
             buf.append("\nJMS MessageID: ").append(getJMSMessageID());
-            buf.append("\nAMQ message number: ").append(_deliveryTag);
+            buf.append("\nAMQ message number: ").append(getDeliveryTag());
 
             buf.append("\nProperties:");
-            if (getJmsHeaders().isEmpty())
+            final Enumeration propertyNames = getPropertyNames();
+            if (!propertyNames.hasMoreElements())
             {
                 buf.append("<NONE>");
             }
             else
             {
-                buf.append('\n').append(getJmsHeaders().getHeaders());
+                buf.append('\n');
+                while(propertyNames.hasMoreElements())
+                {
+                    String propertyName = (String) propertyNames.nextElement();
+                    buf.append(propertyName).append(":\t").append(getObjectProperty(propertyName));
+                }
+
             }
 
             return buf.toString();
@@ -660,14 +405,10 @@
         }
     }
 
-    public void setUnderlyingMessagePropertiesMap(FieldTable messageProperties)
-    {
-        getContentHeaderProperties().setHeaders(messageProperties);
-    }
 
-    public JMSHeaderAdapter getJmsHeaders()
+    public AMQMessageDelegate getDelegate()
     {
-        return _headerAdapter;
+        return _delegate;
     }
 
     public ByteBuffer getData()
@@ -698,25 +439,6 @@
         }
     }
 
-    protected void checkWritableProperties() throws MessageNotWriteableException
-    {
-        if (_readableProperties)
-        {
-            throw new MessageNotWriteableException("You need to call clearProperties() to make the message writable");
-        }
-        _contentHeaderProperties.updated();
-    }
-
-    public boolean isReadable()
-    {
-        return _readableMessage;
-    }
-
-    public boolean isWritable()
-    {
-        return !_readableMessage;
-    }
-
     public void reset()
     {
         if (!_changedData)
@@ -726,7 +448,6 @@
         else
         {
             _data.flip();
-            dataChanged();
             _changedData = false;
         }
     }
@@ -748,4 +469,66 @@
         _changedData = false;
     }
 
+    /**
+     * The session is set when CLIENT_ACKNOWLEDGE mode is used so that the CHANNEL ACK can be sent when the user calls
+     * acknowledge()
+     *
+     * @param s the AMQ session that delivered this message
+     */
+    public void setAMQSession(AMQSession s)
+    {
+        _delegate.setAMQSession(s);
+    }
+
+    public AMQSession getAMQSession()
+    {
+        return _delegate.getAMQSession();
+    }
+
+    /**
+     * Get the AMQ message number assigned to this message
+     *
+     * @return the message number
+     */
+    public long getDeliveryTag()
+    {
+        return _delegate.getDeliveryTag();
+    }
+
+    /** Invoked prior to sending the message. Allows the message to be modified if necessary before sending. */
+    public void prepareForSending() throws JMSException
+    {
+    }
+
+
+    public void setContentType(String contentType)
+    {
+        _delegate.setContentType(contentType);
+    }
+
+    public String getContentType()
+    {
+        return _delegate.getContentType();
+    }
+
+    public void setEncoding(String encoding)
+    {
+        _delegate.setEncoding(encoding);
+    }
+
+    public String getEncoding()
+    {
+        return _delegate.getEncoding();
+    }
+
+    public String getReplyToString()
+    {
+        return _delegate.getReplyToString();
+    }
+
+    protected void removeProperty(final String propertyName) throws JMSException
+    {
+        _delegate.removeProperty(propertyName);
+    }
+
 }

Modified: incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java Thu Aug 14 20:40:49 2008
@@ -27,9 +27,9 @@
 import org.apache.qpid.framing.ContentBody;
 import org.apache.qpid.framing.ContentHeaderBody;
 import org.apache.qpid.framing.BasicContentHeaderProperties;
-import org.apache.qpidity.transport.Struct;
-import org.apache.qpidity.transport.MessageProperties;
-import org.apache.qpidity.transport.DeliveryProperties;
+import org.apache.qpid.transport.Struct;
+import org.apache.qpid.transport.MessageProperties;
+import org.apache.qpid.transport.DeliveryProperties;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -38,17 +38,11 @@
 
 import java.util.Iterator;
 import java.util.List;
-import java.util.Map;
-import java.util.UUID;
 
 public abstract class AbstractJMSMessageFactory implements MessageFactory
 {
     private static final Logger _logger = LoggerFactory.getLogger(AbstractJMSMessageFactory.class);
 
-    protected abstract AbstractJMSMessage createMessage(long messageNbr, ByteBuffer data, AMQShortString exchange,
-                                                        AMQShortString routingKey,
-                                                        BasicContentHeaderProperties contentHeader) throws AMQException;
-
     protected AbstractJMSMessage create08MessageWithBody(long messageNbr, ContentHeaderBody contentHeader,
                                                          AMQShortString exchange, AMQShortString routingKey,
                                                          List bodies) throws AMQException
@@ -105,23 +99,28 @@
                     .remaining());
         }
 
-        return createMessage(messageNbr, data, exchange, routingKey,
-                             (BasicContentHeaderProperties) contentHeader.properties);
+        AMQMessageDelegate delegate = new AMQMessageDelegate_0_8(messageNbr,
+                                                                 (BasicContentHeaderProperties) contentHeader.properties,
+                                                                 exchange, routingKey);
+
+        return createMessage(delegate, data);
     }
 
+    protected abstract AbstractJMSMessage createMessage(AMQMessageDelegate delegate, ByteBuffer data) throws AMQException;
+
+
     protected AbstractJMSMessage create010MessageWithBody(long messageNbr, Struct[] contentHeader,
-                                                          AMQShortString exchange, AMQShortString routingKey,
-                                                          List bodies, String replyToURL) throws AMQException
+                                                          java.nio.ByteBuffer body) throws AMQException
     {
         ByteBuffer data;
         final boolean debug = _logger.isDebugEnabled();
 
-        // we optimise the non-fragmented case to avoid copying
-        if ((bodies != null))
+
+        if (body != null)
         {
-            data = ByteBuffer.wrap((java.nio.ByteBuffer) bodies.get(0));
+            data = ByteBuffer.wrap(body);
         }
-        else // bodies == null
+        else // body == null
         {
             data = ByteBuffer.allocate(0);
         }
@@ -131,40 +130,13 @@
             _logger.debug("Creating message from buffer with position=" + data.position() + " and remaining=" + data
                     .remaining());
         }
-        BasicContentHeaderProperties props = new BasicContentHeaderProperties();
         // set the properties of this message
         MessageProperties mprop = (MessageProperties) contentHeader[0];
         DeliveryProperties devprop = (DeliveryProperties) contentHeader[1];
-        props.setContentType(mprop.getContentType());
-        props.setCorrelationId(asString(mprop.getCorrelationId()));
-        String encoding = mprop.getContentEncoding();
-        if (encoding != null && !encoding.equals(""))
-        {
-            props.setEncoding(encoding);
-        }
-        if (devprop.hasDeliveryMode())
-        {
-            props.setDeliveryMode((byte) devprop.getDeliveryMode().getValue());
-        }
-        props.setExpiration(devprop.getExpiration());
-        UUID mid = mprop.getMessageId();
-        props.setMessageId(mid == null ? null : "ID:" + mid.toString());
-        if (devprop.hasPriority())
-        {
-            props.setPriority((byte) devprop.getPriority().getValue());
-        }
-        props.setReplyTo(replyToURL);
-        props.setTimestamp(devprop.getTimestamp());
-        String type = null;
-        Map<String,Object> map = mprop.getApplicationHeaders();
-        if (map != null)
-        {
-            type = (String) map.get(AbstractJMSMessage.JMS_TYPE);
-        }
-        props.setType(type);
-        props.setUserId(asString(mprop.getUserId()));
-        props.setHeaders(FiledTableSupport.convertToFieldTable(mprop.getApplicationHeaders()));        
-        AbstractJMSMessage message = createMessage(messageNbr, data, exchange, routingKey, props);        
+
+        AMQMessageDelegate delegate = new AMQMessageDelegate_0_10(mprop, devprop, messageNbr);
+
+        AbstractJMSMessage message = createMessage(delegate, data);
         return message;
     }
 
@@ -192,12 +164,11 @@
     }
 
     public AbstractJMSMessage createMessage(long messageNbr, boolean redelivered, Struct[] contentHeader,
-                                            AMQShortString exchange, AMQShortString routingKey, List bodies,
-                                            String replyToURL)
+                                            java.nio.ByteBuffer body)
             throws JMSException, AMQException
     {
         final AbstractJMSMessage msg =
-                create010MessageWithBody(messageNbr, contentHeader, exchange, routingKey, bodies, replyToURL);
+                create010MessageWithBody(messageNbr, contentHeader, body);
         msg.setJMSRedelivered(redelivered);
         msg.receivedFromServer();
         return msg;

Modified: incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java Thu Aug 14 20:40:49 2008
@@ -33,46 +33,47 @@
 import org.apache.mina.common.ByteBuffer;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.ContentHeaderBody;
 import org.apache.qpid.framing.BasicContentHeaderProperties;
 
 public class JMSBytesMessage extends AbstractBytesMessage implements BytesMessage
 {
     public static final String MIME_TYPE = "application/octet-stream";
-    private static final AMQShortString MIME_TYPE_SHORT_STRING = new AMQShortString(MIME_TYPE);
 
 
-    public JMSBytesMessage()
+
+    public JMSBytesMessage(AMQMessageDelegateFactory delegateFactory)
     {
-        this(null);
+        this(delegateFactory,null);
+
     }
 
     /**
      * Construct a bytes message with existing data.
      *
+     * @param delegateFactory
      * @param data the data that comprises this message. If data is null, you get a 1024 byte buffer that is
-     *             set to auto expand
      */
-    JMSBytesMessage(ByteBuffer data)
+    JMSBytesMessage(AMQMessageDelegateFactory delegateFactory, ByteBuffer data)
     {
-        super(data); // this instanties a content header
+
+        super(delegateFactory, data); // this instanties a content header
     }
 
-    JMSBytesMessage(long messageNbr, BasicContentHeaderProperties contentHeader, AMQShortString exchange,
-                    AMQShortString routingKey, ByteBuffer data) throws AMQException
+    JMSBytesMessage(AMQMessageDelegate delegate, ByteBuffer data) throws AMQException
     {
-        super(messageNbr, contentHeader, exchange, routingKey, data);
+        super(delegate, data);
     }
 
+
     public void reset()
     {
         super.reset();
         _readableMessage = true;
     }
 
-    public AMQShortString getMimeTypeAsShortString()
+    protected String getMimeType()
     {
-        return MIME_TYPE_SHORT_STRING;
+        return MIME_TYPE;
     }
 
     public long getBodyLength() throws JMSException

Modified: incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessageFactory.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessageFactory.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessageFactory.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessageFactory.java Thu Aug 14 20:40:49 2008
@@ -25,21 +25,18 @@
 import org.apache.mina.common.ByteBuffer;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.ContentHeaderBody;
 import org.apache.qpid.framing.BasicContentHeaderProperties;
 
 public class JMSBytesMessageFactory extends AbstractJMSMessageFactory
 {
-    protected AbstractJMSMessage createMessage(long deliveryTag, ByteBuffer data,
-                                               AMQShortString exchange, AMQShortString routingKey,
-                                               BasicContentHeaderProperties contentHeader) throws AMQException
+    protected AbstractJMSMessage createMessage(AMQMessageDelegate delegate, ByteBuffer data) throws AMQException
     {
-        return new JMSBytesMessage(deliveryTag, contentHeader, exchange, routingKey, data);
+        return new JMSBytesMessage(delegate, data);
     }
 
-    public AbstractJMSMessage createMessage() throws JMSException
+    public AbstractJMSMessage createMessage(AMQMessageDelegateFactory delegateFactory) throws JMSException
     {
-        return new JMSBytesMessage();
+        return new JMSBytesMessage(delegateFactory);
     }
 
     // 0_10 specific

Modified: incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/message/JMSHeaderAdapter.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/message/JMSHeaderAdapter.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/message/JMSHeaderAdapter.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/message/JMSHeaderAdapter.java Thu Aug 14 20:40:49 2008
@@ -467,7 +467,7 @@
         return getPropertyNames();
     }
 
-    protected static void checkPropertyName(CharSequence propertyName)
+    protected void checkPropertyName(CharSequence propertyName)
     {
         if (propertyName == null)
         {
@@ -481,7 +481,7 @@
         checkIdentiferFormat(propertyName);
     }
 
-    protected static void checkIdentiferFormat(CharSequence propertyName)
+    protected void checkIdentiferFormat(CharSequence propertyName)
     {
 //        JMS requirements 3.5.1 Property Names
 //        Identifiers:
@@ -492,14 +492,14 @@
 //          '_' and '$'. An identifier part character is any character for which the
 //          method Character.isJavaIdentifierPart returns true.
 //        - Identifiers cannot be the names NULL, TRUE, or FALSE.
-//        � Identifiers cannot be NOT, AND, OR, BETWEEN, LIKE, IN, IS, or
+//          Identifiers cannot be NOT, AND, OR, BETWEEN, LIKE, IN, IS, or
 //          ESCAPE.
-//        � Identifiers are either header field references or property references. The
+//          Identifiers are either header field references or property references. The
 //          type of a property value in a message selector corresponds to the type
 //          used to set the property. If a property that does not exist in a message is
 //          referenced, its value is NULL. The semantics of evaluating NULL values
-//          in a selector are described in Section 3.8.1.2, �Null Values.�
-//        � The conversions that apply to the get methods for properties do not
+//          in a selector are described in Section 3.8.1.2, Null Values.
+//          The conversions that apply to the get methods for properties do not
 //          apply when a property is used in a message selector expression. For
 //          example, suppose you set a property as a string value, as in the
 //          following:
@@ -507,8 +507,8 @@
 //          The following expression in a message selector would evaluate to false,
 //          because a string cannot be used in an arithmetic expression:
 //          "NumberOfOrders > 1"
-//        � Identifiers are case sensitive.
-//        � Message header field references are restricted to JMSDeliveryMode,
+//          Identifiers are case sensitive.
+//          Message header field references are restricted to JMSDeliveryMode,
 //          JMSPriority, JMSMessageID, JMSTimestamp, JMSCorrelationID, and
 //          JMSType. JMSMessageID, JMSCorrelationID, and JMSType values may be
 //          null and if so are treated as a NULL value.

Modified: incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java Thu Aug 14 20:40:49 2008
@@ -24,7 +24,6 @@
 
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.ContentHeaderBody;
 import org.apache.qpid.framing.BasicContentHeaderProperties;
 
 import org.slf4j.Logger;
@@ -44,18 +43,19 @@
     private static final Logger _logger = LoggerFactory.getLogger(JMSMapMessage.class);
 
     public static final String MIME_TYPE = "jms/map-message";
-    private static final AMQShortString MIME_TYPE_SHORT_STRING = new AMQShortString(MIME_TYPE);
+
 
     private Map<String, Object> _map = new HashMap<String, Object>();
 
-    public JMSMapMessage() throws JMSException
+    public JMSMapMessage(AMQMessageDelegateFactory delegateFactory) throws JMSException
     {
-        this(null);
+        this(delegateFactory, null);
     }
 
-    JMSMapMessage(ByteBuffer data) throws JMSException
+    JMSMapMessage(AMQMessageDelegateFactory delegateFactory, ByteBuffer data) throws JMSException
     {
-        super(data); // this instantiates a content header
+
+        super(delegateFactory, data); // this instantiates a content header
         if(data != null)
         {
             populateMapFromData();
@@ -63,10 +63,10 @@
 
     }
 
-    JMSMapMessage(long messageNbr, BasicContentHeaderProperties contentHeader, AMQShortString exchange, AMQShortString routingKey,
-        ByteBuffer data) throws AMQException
+    JMSMapMessage(AMQMessageDelegate delegate, ByteBuffer data) throws AMQException
     {
-        super(messageNbr, contentHeader, exchange, routingKey, data);
+
+        super(delegate, data);
         try
         {
             populateMapFromData();
@@ -79,14 +79,15 @@
 
     }
 
+
     public String toBodyString() throws JMSException
     {
         return _map == null ? "" : _map.toString();
     }
 
-    public AMQShortString getMimeTypeAsShortString()
+    protected String getMimeType()
     {
-        return MIME_TYPE_SHORT_STRING;
+        return MIME_TYPE;
     }
 
     public ByteBuffer getData()

Modified: incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessageFactory.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessageFactory.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessageFactory.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessageFactory.java Thu Aug 14 20:40:49 2008
@@ -25,21 +25,18 @@
 import org.apache.mina.common.ByteBuffer;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.ContentHeaderBody;
 import org.apache.qpid.framing.BasicContentHeaderProperties;
 
 public class JMSMapMessageFactory extends AbstractJMSMessageFactory
 {
-    public AbstractJMSMessage createMessage() throws JMSException
+    public AbstractJMSMessage createMessage(AMQMessageDelegateFactory delegateFactory) throws JMSException
     {
-        return new JMSMapMessage();
+        return new JMSMapMessage(delegateFactory);
     }
 
-    protected AbstractJMSMessage createMessage(long deliveryTag, ByteBuffer data,
-                                               AMQShortString exchange, AMQShortString routingKey, 
-                                               BasicContentHeaderProperties contentHeader) throws AMQException
+    protected AbstractJMSMessage createMessage(AMQMessageDelegate delegate, ByteBuffer data) throws AMQException
     {
-        return new JMSMapMessage(deliveryTag, contentHeader, exchange, routingKey, data);
+        return new JMSMapMessage(delegate, data);
 
     }
 }

Modified: incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java Thu Aug 14 20:40:49 2008
@@ -37,63 +37,65 @@
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.BasicContentHeaderProperties;
-import org.apache.qpid.framing.ContentHeaderBody;
 
 public class JMSObjectMessage extends AbstractJMSMessage implements ObjectMessage
 {
     public static final String MIME_TYPE = "application/java-object-stream";
-    private static final AMQShortString MIME_TYPE_SHORT_STRING = new AMQShortString(MIME_TYPE);
+
 
     private static final int DEFAULT_BUFFER_SIZE = 1024;
 
     /**
      * Creates empty, writable message for use by producers
+     * @param delegateFactory
      */
-    public JMSObjectMessage()
+    public JMSObjectMessage(AMQMessageDelegateFactory delegateFactory)
     {
-        this(null);
+        this(delegateFactory, null);
     }
 
-    private JMSObjectMessage(ByteBuffer data)
+    private JMSObjectMessage(AMQMessageDelegateFactory delegateFactory, ByteBuffer data)
     {
-        super(data);
+        super(delegateFactory, data);
         if (data == null)
         {
             _data = ByteBuffer.allocate(DEFAULT_BUFFER_SIZE);
             _data.setAutoExpand(true);
         }
 
-        getContentHeaderProperties().setContentType(MIME_TYPE_SHORT_STRING);
+        setContentType(getMimeType());
     }
 
     /**
      * Creates read only message for delivery to consumers
      */
-    JMSObjectMessage(long messageNbr, BasicContentHeaderProperties contentHeader, AMQShortString exchange, AMQShortString routingKey,
-        ByteBuffer data) throws AMQException
-    {
-        super(messageNbr, contentHeader, exchange, routingKey, data);
-    }
+
+      JMSObjectMessage(AMQMessageDelegate delegate, ByteBuffer data) throws AMQException
+      {
+          super(delegate, data);
+      }
+
 
     public void clearBodyImpl() throws JMSException
     {
         if (_data != null)
         {
             _data.release();
+            _data = null;
         }
 
-        _data = null;
+
 
     }
 
     public String toBodyString() throws JMSException
     {
-        return toString(_data);
+        return String.valueOf(getObject());
     }
 
-    public AMQShortString getMimeTypeAsShortString()
+    public String getMimeType()
     {
-        return MIME_TYPE_SHORT_STRING;
+        return MIME_TYPE;
     }
 
     public void setObject(Serializable serializable) throws JMSException
@@ -172,26 +174,4 @@
         catch (IOException ignore)
         { }
     }
-
-    private static String toString(ByteBuffer data)
-    {
-        if (data == null)
-        {
-            return null;
-        }
-
-        int pos = data.position();
-        try
-        {
-            return data.getString(Charset.forName("UTF8").newDecoder());
-        }
-        catch (CharacterCodingException e)
-        {
-            return null;
-        }
-        finally
-        {
-            data.position(pos);
-        }
-    }
 }

Modified: incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessageFactory.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessageFactory.java?rev=686136&r1=686135&r2=686136&view=diff
==============================================================================
--- incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessageFactory.java (original)
+++ incubator/qpid/branches/qpid.0-10/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessageFactory.java Thu Aug 14 20:40:49 2008
@@ -25,20 +25,17 @@
 import org.apache.mina.common.ByteBuffer;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.ContentHeaderBody;
 import org.apache.qpid.framing.BasicContentHeaderProperties;
 
 public class JMSObjectMessageFactory extends AbstractJMSMessageFactory
 {
-    protected AbstractJMSMessage createMessage(long deliveryTag, ByteBuffer data,
-                                               AMQShortString exchange, AMQShortString routingKey, 
-                                               BasicContentHeaderProperties contentHeader) throws AMQException
+    protected AbstractJMSMessage createMessage(AMQMessageDelegate delegate, ByteBuffer data) throws AMQException
     {
-        return new JMSObjectMessage(deliveryTag, contentHeader, exchange, routingKey, data);
+        return new JMSObjectMessage(delegate, data);
     }
 
-    public AbstractJMSMessage createMessage() throws JMSException
+    public AbstractJMSMessage createMessage(AMQMessageDelegateFactory delegateFactory) throws JMSException
     {
-        return new JMSObjectMessage();
+        return new JMSObjectMessage(delegateFactory);
     }
 }