You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2007/03/22 14:15:14 UTC

svn commit: r521253 [8/10] - in /incubator/qpid/branches/java.multi_version: ./ gentools/src/org/apache/qpid/gentools/ gentools/templ.cpp/ gentools/templ.cpp/class/ gentools/templ.cpp/field/ gentools/templ.cpp/method/ gentools/templ.cpp/model/ gentools...

Modified: incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java?view=diff&rev=521253&r1=511387&r2=521253
==============================================================================
--- incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java (original)
+++ incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java Thu Mar 22 06:14:42 2007
@@ -33,6 +33,7 @@
 
 import org.apache.log4j.Logger;
 import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQTimeoutException;
 import org.apache.qpid.client.message.AbstractJMSMessage;
 import org.apache.qpid.client.message.MessageFactoryRegistry;
 import org.apache.qpid.client.message.UnprocessedMessage;
@@ -42,6 +43,8 @@
 import org.apache.qpid.framing.BasicCancelBody;
 import org.apache.qpid.framing.BasicCancelOkBody;
 import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.AMQMethodFactory;
+import org.apache.qpid.framing.AMQMethodBody;
 import org.apache.qpid.jms.MessageConsumer;
 import org.apache.qpid.jms.Session;
 
@@ -438,16 +441,10 @@
             {
                 if (sendClose)
                 {
-                    // TODO: Be aware of possible changes to parameter order as versions change.
-                    final AMQFrame cancelFrame = BasicCancelBody.createAMQFrame(_channelId,
-                                                                                _protocolHandler.getProtocolMajorVersion(),
-                                                                                _protocolHandler.getProtocolMinorVersion(),
-                                                                                _consumerTag,    // consumerTag
-                                                                                false);    // nowait
-
                     try
                     {
-                        _protocolHandler.syncWrite(cancelFrame, BasicCancelOkBody.class);
+                        AMQMethodBody cancelBody = getAMQMethodFactory().createConsumerCancel(_consumerTag);
+                        sendCommandReceiveResponse(cancelBody);
                     }
                     catch (AMQException e)
                     {
@@ -467,6 +464,16 @@
         }
     }
 
+    private AMQMethodBody sendCommandReceiveResponse(AMQMethodBody cancelBody) throws AMQException
+    {
+        return getSession().sendCommandReceiveResponse(cancelBody);
+    }
+
+    private AMQMethodFactory getAMQMethodFactory()
+    {
+        return getSession().getAMQMethodFactory();
+    }
+
     /**
      * Called when you need to invalidate a consumer. Used for example when failover has occurred and the client has
      * vetoed automatic resubscription. The caller must hold the failover mutex.
@@ -490,14 +497,14 @@
 
         if (debug)
         {
-            _logger.debug("notifyMessage called with message number " + messageFrame.getDeliverBody().deliveryTag);
+            _logger.debug("notifyMessage called with message number " + messageFrame.getDeliverBody().getDeliveryTag());
         }
         try
         {
-            AbstractJMSMessage jmsMessage = _messageFactory.createMessage(messageFrame.getDeliverBody().deliveryTag,
-                                                                          messageFrame.getDeliverBody().redelivered,
-                                                                          messageFrame.getDeliverBody().exchange,
-                                                                          messageFrame.getDeliverBody().routingKey,
+            AbstractJMSMessage jmsMessage = _messageFactory.createMessage(messageFrame.getDeliverBody().getDeliveryTag(),
+                                                                          messageFrame.getDeliverBody().getRedelivered(),
+                                                                          messageFrame.getDeliverBody().getExchange(),
+                                                                          messageFrame.getDeliverBody().getRoutingKey(),
                                                                           messageFrame.getContentHeader(),
                                                                           messageFrame.getBodies());
 

Modified: incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java?view=diff&rev=521253&r1=511387&r2=521253
==============================================================================
--- incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java (original)
+++ incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java Thu Mar 22 06:14:42 2007
@@ -38,17 +38,11 @@
 import org.apache.log4j.Logger;
 import org.apache.mina.common.ByteBuffer;
 import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQTimeoutException;
 import org.apache.qpid.client.message.AbstractJMSMessage;
 import org.apache.qpid.client.message.MessageConverter;
-import org.apache.qpid.client.protocol.AMQProtocolHandler;
-import org.apache.qpid.framing.AMQFrame;
-import org.apache.qpid.framing.BasicConsumeBody;
-import org.apache.qpid.framing.BasicContentHeaderProperties;
-import org.apache.qpid.framing.BasicPublishBody;
-import org.apache.qpid.framing.CompositeAMQDataBlock;
-import org.apache.qpid.framing.ContentBody;
-import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.framing.ExchangeDeclareBody;
+import org.apache.qpid.client.protocol.AMQProtocolHandlerImpl;
+import org.apache.qpid.framing.*;
 
 public class BasicMessageProducer extends Closeable implements org.apache.qpid.jms.MessageProducer
 {
@@ -91,7 +85,7 @@
      */
     private String _mimeType;
 
-    private AMQProtocolHandler _protocolHandler;
+    private AMQProtocolHandlerImpl _protocolHandler;
 
     /**
      * True if this producer was created from a transacted session
@@ -121,7 +115,7 @@
     private static final ContentBody[] NO_CONTENT_BODIES = new ContentBody[0];
 
     protected BasicMessageProducer(AMQConnection connection, AMQDestination destination, boolean transacted, int channelId,
-                                   AMQSession session, AMQProtocolHandler protocolHandler, long producerId,
+                                   AMQSession session, AMQProtocolHandlerImpl protocolHandler, long producerId,
                                    boolean immediate, boolean mandatory, boolean waitUntilSent)
     {
         _connection = connection;
@@ -152,20 +146,28 @@
     private void declareDestination(AMQDestination destination)
     {
         // Declare the exchange
-        // Note that the durable and internal arguments are ignored since passive is set to false
-        // TODO: Be aware of possible changes to parameter order as versions change.
-        AMQFrame declare =
-            ExchangeDeclareBody.createAMQFrame(_channelId, _protocolHandler.getProtocolMajorVersion(),
-                                               _protocolHandler.getProtocolMinorVersion(), null, // arguments
-                                               false, // autoDelete
-                                               false, // durable
-                                               destination.getExchangeName(), // exchange
-                                               false, // internal
-                                               true, // nowait
-                                               false, // passive
-                                               _session.getTicket(), // ticket
-                                               destination.getExchangeClass()); // type
-        _protocolHandler.writeFrame(declare);
+
+        ExchangeDeclareBody exchangeDeclareBody =
+                getAMQMethodFactory().createExchangeDeclare(destination.getExchangeName(),
+                                                            destination.getExchangeClass(),
+                                                            _session.getTicket());
+        sendCommand(exchangeDeclareBody);
+
+    }
+
+    private void sendCommand(AMQMethodBody command)
+    {
+        getSession().sendCommand(command);
+    }
+
+    private AMQMethodBody sendCommandReceiveResponse(AMQMethodBody command) throws AMQException
+    {
+        return getSession().sendCommandReceiveResponse(command);
+    }
+
+    private AMQMethodFactory getAMQMethodFactory()
+    {
+        return getSession().getAMQMethodFactory();
     }
 
     public void setDisableMessageID(boolean b) throws JMSException
@@ -467,21 +469,9 @@
 
         message.getJmsHeaders().setInteger(CustomJMSXProperty.JMS_QPID_DESTTYPE.getShortStringName(), type);
 
-        // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
-        // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
-        // Be aware of possible changes to parameter order as versions change.
-        AMQFrame publishFrame =
-            BasicPublishBody.createAMQFrame(
-                _channelId, _protocolHandler.getProtocolMajorVersion(), _protocolHandler.getProtocolMinorVersion(),
-                destination.getExchangeName(), // exchange
-                immediate, // immediate
-                mandatory, // mandatory
-                destination.getRoutingKey(), // routingKey
-                _session.getTicket()); // ticket
-
         message.prepareForSending();
         ByteBuffer payload = message.getData();
-        BasicContentHeaderProperties contentHeaderProperties = message.getContentHeaderProperties();
+        CommonContentHeaderProperties contentHeaderProperties = message.getContentHeaderProperties();
 
         if (!_disableTimestamps)
         {
@@ -501,37 +491,15 @@
         contentHeaderProperties.setDeliveryMode((byte) deliveryMode);
         contentHeaderProperties.setPriority((byte) priority);
 
-        final int size = (payload != null) ? payload.limit() : 0;
-        final int contentBodyFrameCount = calculateContentBodyFrameCount(payload);
-        final AMQFrame[] frames = new AMQFrame[2 + contentBodyFrameCount];
+        getSession().getProtocolOutputHandler().publishMessage(getSession().getChannelId(),
+                                                               destination.getExchangeName(),
+                                                               destination.getRoutingKey(),
+                                                               immediate,
+                                                               mandatory,
+                                                               payload,
+                                                               contentHeaderProperties,
+                                                               getSession().getTicket());
 
-        if (payload != null)
-        {
-            createContentBodies(payload, frames, 2, _channelId);
-        }
-
-        if ((contentBodyFrameCount != 0) && _logger.isDebugEnabled())
-        {
-            _logger.debug("Sending content body frames to " + destination);
-        }
-
-        // weight argument of zero indicates no child content headers, just bodies
-        // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
-        // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
-        AMQFrame contentHeaderFrame =
-            ContentHeaderBody.createAMQFrame(_channelId,
-                                             BasicConsumeBody.getClazz(_protocolHandler.getProtocolMajorVersion(),
-                                                                       _protocolHandler.getProtocolMinorVersion()), 0,
-                                             contentHeaderProperties, size);
-        if (_logger.isDebugEnabled())
-        {
-            _logger.debug("Sending content header frame to " + destination);
-        }
-
-        frames[0] = publishFrame;
-        frames[1] = contentHeaderFrame;
-        CompositeAMQDataBlock compositeFrame = new CompositeAMQDataBlock(frames);
-        _protocolHandler.writeFrame(compositeFrame, wait);
 
         if (message != origMessage)
         {
@@ -544,6 +512,8 @@
         }
     }
 
+
+
     private void checkTemporaryDestination(AMQDestination destination) throws JMSException
     {
         if (destination instanceof TemporaryDestination)
@@ -564,59 +534,6 @@
         }
     }
 
-    /**
-     * Create content bodies. This will split a large message into numerous bodies depending on the negotiated
-     * maximum frame size.
-     *
-     * @param payload
-     * @param frames
-     * @param offset
-     * @param channelId @return the array of content bodies
-     */
-    private void createContentBodies(ByteBuffer payload, AMQFrame[] frames, int offset, int channelId)
-    {
-
-        if (frames.length == (offset + 1))
-        {
-            frames[offset] = ContentBody.createAMQFrame(channelId, new ContentBody(payload));
-        }
-        else
-        {
-
-            final long framePayloadMax = _session.getAMQConnection().getMaximumFrameSize() - 1;
-            long remaining = payload.remaining();
-            for (int i = offset; i < frames.length; i++)
-            {
-                payload.position((int) framePayloadMax * (i - offset));
-                int length = (remaining >= framePayloadMax) ? (int) framePayloadMax : (int) remaining;
-                payload.limit(payload.position() + length);
-                frames[i] = ContentBody.createAMQFrame(channelId, new ContentBody(payload.slice()));
-
-                remaining -= length;
-            }
-        }
-
-    }
-
-    private int calculateContentBodyFrameCount(ByteBuffer payload)
-    {
-        // we substract one from the total frame maximum size to account for the end of frame marker in a body frame
-        // (0xCE byte).
-        int frameCount;
-        if ((payload == null) || (payload.remaining() == 0))
-        {
-            frameCount = 0;
-        }
-        else
-        {
-            int dataLength = payload.remaining();
-            final long framePayloadMax = _session.getAMQConnection().getMaximumFrameSize() - 1;
-            int lastFrame = ((dataLength % framePayloadMax) > 0) ? 1 : 0;
-            frameCount = (int) (dataLength / framePayloadMax) + lastFrame;
-        }
-
-        return frameCount;
-    }
 
     public void setMimeType(String mimeType) throws JMSException
     {

Modified: incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java?view=diff&rev=521253&r1=511387&r2=521253
==============================================================================
--- incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java (original)
+++ incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java Thu Mar 22 06:14:42 2007
@@ -25,7 +25,7 @@
 import org.apache.log4j.Logger;
 import org.apache.mina.common.IoSession;
 import org.apache.qpid.AMQDisconnectedException;
-import org.apache.qpid.client.protocol.AMQProtocolHandler;
+import org.apache.qpid.client.protocol.AMQProtocolHandlerImpl;
 import org.apache.qpid.client.state.AMQStateManager;
 
 /**
@@ -42,7 +42,7 @@
     private static final Logger _logger = Logger.getLogger(FailoverHandler.class);
 
     private final IoSession _session;
-    private AMQProtocolHandler _amqProtocolHandler;
+    private AMQProtocolHandlerImpl _amqProtocolHandler;
 
     /**
      * Used where forcing the failover host
@@ -54,7 +54,7 @@
      */
     private int _port;
 
-    public FailoverHandler(AMQProtocolHandler amqProtocolHandler, IoSession session)
+    public FailoverHandler(AMQProtocolHandlerImpl amqProtocolHandler, IoSession session)
     {
         _amqProtocolHandler = amqProtocolHandler;
         _session = session;

Added: incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/handler/amqp_0_9/ChannelCloseMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/handler/amqp_0_9/ChannelCloseMethodHandler.java?view=auto&rev=521253
==============================================================================
--- incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/handler/amqp_0_9/ChannelCloseMethodHandler.java (added)
+++ incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/handler/amqp_0_9/ChannelCloseMethodHandler.java Thu Mar 22 06:14:42 2007
@@ -0,0 +1,24 @@
+package org.apache.qpid.client.handler.amqp_0_9;
+
+import org.apache.qpid.framing.ChannelCloseOkBody;
+import org.apache.qpid.framing.amqp_0_9.ChannelCloseOkBodyImpl;
+
+import org.apache.log4j.Logger;
+
+public class ChannelCloseMethodHandler extends org.apache.qpid.client.handler.amqp_8_0.ChannelCloseMethodHandler
+{
+    private static final Logger _logger = Logger.getLogger(ChannelCloseMethodHandler.class);
+
+    private static ChannelCloseMethodHandler _handler = new ChannelCloseMethodHandler();
+
+    public static ChannelCloseMethodHandler getInstance()
+    {
+        return _handler;
+    }
+
+
+    protected ChannelCloseOkBody createChannelCloseOkBody()
+    {
+        return new ChannelCloseOkBodyImpl();
+    }
+}

Added: incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/handler/amqp_0_9/ConnectionCloseMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/handler/amqp_0_9/ConnectionCloseMethodHandler.java?view=auto&rev=521253
==============================================================================
--- incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/handler/amqp_0_9/ConnectionCloseMethodHandler.java (added)
+++ incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/handler/amqp_0_9/ConnectionCloseMethodHandler.java Thu Mar 22 06:14:42 2007
@@ -0,0 +1,29 @@
+package org.apache.qpid.client.handler.amqp_0_9;
+
+import org.apache.qpid.framing.ConnectionCloseOkBody;
+import org.apache.qpid.framing.amqp_0_9.ConnectionCloseOkBodyImpl;
+
+import org.apache.log4j.Logger;
+
+public class ConnectionCloseMethodHandler extends org.apache.qpid.client.handler.amqp_8_0.ConnectionCloseMethodHandler
+{
+    private static final Logger _logger = Logger.getLogger(ConnectionCloseMethodHandler.class);
+
+    private static ConnectionCloseMethodHandler _handler = new ConnectionCloseMethodHandler();
+
+    public static ConnectionCloseMethodHandler getInstance()
+    {
+        return _handler;
+    }
+
+    protected ConnectionCloseMethodHandler()
+    {
+    }
+
+
+    protected ConnectionCloseOkBody createConnectionCloseOkBody()
+    {
+        return new ConnectionCloseOkBodyImpl();
+    }
+
+}

Added: incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/handler/amqp_0_9/ConnectionSecureMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/handler/amqp_0_9/ConnectionSecureMethodHandler.java?view=auto&rev=521253
==============================================================================
--- incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/handler/amqp_0_9/ConnectionSecureMethodHandler.java (added)
+++ incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/handler/amqp_0_9/ConnectionSecureMethodHandler.java Thu Mar 22 06:14:42 2007
@@ -0,0 +1,28 @@
+package org.apache.qpid.client.handler.amqp_0_9;
+
+import org.apache.qpid.client.state.StateAwareMethodListener;
+import org.apache.qpid.client.state.AMQStateManager;
+import org.apache.qpid.client.protocol.AMQProtocolSession;
+import org.apache.qpid.protocol.AMQMethodEvent;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.ConnectionSecureBody;
+import org.apache.qpid.framing.ConnectionSecureOkBody;
+import org.apache.qpid.framing.amqp_8_0.ConnectionSecureOkBodyImpl;
+
+import javax.security.sasl.SaslClient;
+import javax.security.sasl.SaslException;
+
+public class ConnectionSecureMethodHandler extends org.apache.qpid.client.handler.amqp_8_0.ConnectionSecureMethodHandler
+{
+    private static final ConnectionSecureMethodHandler _instance = new ConnectionSecureMethodHandler();
+
+    public static ConnectionSecureMethodHandler getInstance()
+    {
+        return _instance;
+    }
+
+    protected ConnectionSecureOkBody createConnectionSecureOkBody(byte[] response)
+    {
+        return new ConnectionSecureOkBodyImpl(response);
+    }
+}

Added: incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/handler/amqp_0_9/ConnectionTuneMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/handler/amqp_0_9/ConnectionTuneMethodHandler.java?view=auto&rev=521253
==============================================================================
--- incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/handler/amqp_0_9/ConnectionTuneMethodHandler.java (added)
+++ incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/handler/amqp_0_9/ConnectionTuneMethodHandler.java Thu Mar 22 06:14:42 2007
@@ -0,0 +1,41 @@
+package org.apache.qpid.client.handler.amqp_0_9;
+
+import org.apache.qpid.client.ConnectionTuneParameters;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.ConnectionOpenBody;
+import org.apache.qpid.framing.ConnectionTuneOkBody;
+import org.apache.qpid.framing.amqp_0_9.ConnectionOpenBodyImpl;
+import org.apache.qpid.framing.amqp_0_9.ConnectionTuneOkBodyImpl;
+
+import org.apache.log4j.Logger;
+
+public class ConnectionTuneMethodHandler extends org.apache.qpid.client.handler.amqp_8_0.ConnectionTuneMethodHandler
+{
+    private static final Logger _logger = Logger.getLogger(ConnectionTuneMethodHandler.class);
+
+    private static final ConnectionTuneMethodHandler _instance = new ConnectionTuneMethodHandler();
+
+    public static ConnectionTuneMethodHandler getInstance()
+    {
+        return _instance;
+    }
+
+
+    protected ConnectionOpenBody createConnectionOpenBody(AMQShortString path, AMQShortString capabilities, boolean insist)
+    {
+
+        return new ConnectionOpenBodyImpl(path,// virtualHost
+            capabilities,	// capabilities
+            insist);	// insist
+
+    }
+
+    protected ConnectionTuneOkBody createTuneOkBody(ConnectionTuneParameters params)
+    {
+        // Be aware of possible changes to parameter order as versions change.
+        return new ConnectionTuneOkBodyImpl(
+            params.getChannelMax(),	// channelMax
+            params.getFrameMax(),	// frameMax
+            params.getHeartbeat());	// heartbeat
+    }
+}

Copied: incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/BasicCancelOkMethodHandler.java (from r511387, incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicCancelOkMethodHandler.java)
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/BasicCancelOkMethodHandler.java?view=diff&rev=521253&p1=incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicCancelOkMethodHandler.java&r1=511387&p2=incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/BasicCancelOkMethodHandler.java&r2=521253
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicCancelOkMethodHandler.java (original)
+++ incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/BasicCancelOkMethodHandler.java Thu Mar 22 06:14:42 2007
@@ -18,7 +18,7 @@
  * under the License.
  *
  */
-package org.apache.qpid.client.handler;
+package org.apache.qpid.client.handler.amqp_8_0;
 
 import org.apache.log4j.Logger;
 import org.apache.qpid.AMQException;
@@ -45,10 +45,12 @@
      {
      }
 
-     public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt) throws AMQException
+     public void methodReceived(AMQStateManager stateManager, AMQMethodEvent evt) throws AMQException
      {
+
          _logger.debug("New BasicCancelOk method received");
+         final AMQProtocolSession protocolSession = stateManager.getProtocolSession();
          BasicCancelOkBody body = (BasicCancelOkBody) evt.getMethod();
-         protocolSession.confirmConsumerCancelled(evt.getChannelId(), body.consumerTag);                  
+         protocolSession.confirmConsumerCancelled(evt.getChannelId(), body.getConsumerTag());
      }
 }

Propchange: incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/BasicCancelOkMethodHandler.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/BasicCancelOkMethodHandler.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Copied: incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/BasicDeliverMethodHandler.java (from r511387, incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicDeliverMethodHandler.java)
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/BasicDeliverMethodHandler.java?view=diff&rev=521253&p1=incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicDeliverMethodHandler.java&r1=511387&p2=incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/BasicDeliverMethodHandler.java&r2=521253
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicDeliverMethodHandler.java (original)
+++ incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/BasicDeliverMethodHandler.java Thu Mar 22 06:14:42 2007
@@ -18,7 +18,7 @@
  * under the License.
  *
  */
-package org.apache.qpid.client.handler;
+package org.apache.qpid.client.handler.amqp_8_0;
 
 import org.apache.log4j.Logger;
 import org.apache.qpid.AMQException;
@@ -40,8 +40,9 @@
         return _instance;
     }
 
-    public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt) throws AMQException
+    public void methodReceived(AMQStateManager stateManager,  AMQMethodEvent evt) throws AMQException
     {
+        final AMQProtocolSession protocolSession = stateManager.getProtocolSession();
         final UnprocessedMessage msg = new UnprocessedMessage(evt.getChannelId(), (BasicDeliverBody) evt.getMethod());
         _logger.debug("New JmsDeliver method received");
         protocolSession.unprocessedMessageReceived(msg);

Propchange: incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/BasicDeliverMethodHandler.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/BasicDeliverMethodHandler.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Copied: incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/BasicReturnMethodHandler.java (from r511387, incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicReturnMethodHandler.java)
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/BasicReturnMethodHandler.java?view=diff&rev=521253&p1=incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicReturnMethodHandler.java&r1=511387&p2=incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/BasicReturnMethodHandler.java&r2=521253
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicReturnMethodHandler.java (original)
+++ incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/BasicReturnMethodHandler.java Thu Mar 22 06:14:42 2007
@@ -18,7 +18,7 @@
  * under the License.
  *
  */
-package org.apache.qpid.client.handler;
+package org.apache.qpid.client.handler.amqp_8_0;
 
 import org.apache.log4j.Logger;
 import org.apache.qpid.AMQException;
@@ -40,9 +40,10 @@
         return _instance;
     }
 
-    public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt) throws AMQException
+    public void methodReceived(AMQStateManager stateManager,  AMQMethodEvent evt) throws AMQException
     {
         _logger.debug("New JmsBounce method received");
+        final AMQProtocolSession protocolSession = stateManager.getProtocolSession();
         final UnprocessedMessage msg = new UnprocessedMessage(evt.getChannelId(),(BasicReturnBody)evt.getMethod());
 
         protocolSession.unprocessedMessageReceived(msg);

Propchange: incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/BasicReturnMethodHandler.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/BasicReturnMethodHandler.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Copied: incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/ChannelCloseMethodHandler.java (from r511387, incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java)
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/ChannelCloseMethodHandler.java?view=diff&rev=521253&p1=incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java&r1=511387&p2=incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/ChannelCloseMethodHandler.java&r2=521253
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseMethodHandler.java (original)
+++ incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/ChannelCloseMethodHandler.java Thu Mar 22 06:14:42 2007
@@ -18,7 +18,7 @@
  * under the License.
  *
  */
-package org.apache.qpid.client.handler;
+package org.apache.qpid.client.handler.amqp_8_0;
 
 import org.apache.log4j.Logger;
 import org.apache.qpid.AMQChannelClosedException;
@@ -29,10 +29,10 @@
 import org.apache.qpid.client.protocol.AMQProtocolSession;
 import org.apache.qpid.client.state.AMQStateManager;
 import org.apache.qpid.client.state.StateAwareMethodListener;
-import org.apache.qpid.framing.AMQFrame;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.ChannelCloseBody;
 import org.apache.qpid.framing.ChannelCloseOkBody;
+import org.apache.qpid.framing.amqp_8_0.ChannelCloseOkBodyImpl;
 import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.protocol.AMQMethodEvent;
 
@@ -47,21 +47,23 @@
         return _handler;
     }
 
-    public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt) throws AMQException
+    public void methodReceived(AMQStateManager stateManager, AMQMethodEvent evt) throws AMQException
     {
         _logger.debug("ChannelClose method received");
+        final AMQProtocolSession protocolSession = stateManager.getProtocolSession();
         ChannelCloseBody method = (ChannelCloseBody) evt.getMethod();
 
-        AMQConstant errorCode = AMQConstant.getConstant(method.replyCode);
-        AMQShortString reason = method.replyText;
+        AMQConstant errorCode = AMQConstant.getConstant(method.getReplyCode());
+        AMQShortString reason = method.getReplyText();
         if (_logger.isDebugEnabled())
         {
             _logger.debug("Channel close reply code: " + errorCode + ", reason: " + reason);
         }
 
-        // TODO: Be aware of possible changes to parameter order as versions change.
-        AMQFrame frame = ChannelCloseOkBody.createAMQFrame(evt.getChannelId(), method.getMajor(), method.getMinor());
-        protocolSession.writeFrame(frame);
+        protocolSession.getOutputHandler().sendCommand(evt.getChannelId(), createChannelCloseOkBody());
+        
+
+
         if (errorCode != AMQConstant.REPLY_SUCCESS)
         {
             if (_logger.isDebugEnabled())
@@ -95,5 +97,10 @@
 
         }
         protocolSession.channelClosed(evt.getChannelId(), errorCode, String.valueOf(reason));
+    }
+
+    protected ChannelCloseOkBody createChannelCloseOkBody()
+    {
+        return new ChannelCloseOkBodyImpl();
     }
 }

Propchange: incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/ChannelCloseMethodHandler.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/ChannelCloseMethodHandler.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Copied: incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/ChannelCloseOkMethodHandler.java (from r511387, incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseOkMethodHandler.java)
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/ChannelCloseOkMethodHandler.java?view=diff&rev=521253&p1=incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseOkMethodHandler.java&r1=511387&p2=incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/ChannelCloseOkMethodHandler.java&r2=521253
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ChannelCloseOkMethodHandler.java (original)
+++ incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/ChannelCloseOkMethodHandler.java Thu Mar 22 06:14:42 2007
@@ -18,11 +18,10 @@
  * under the License.
  *
  */
-package org.apache.qpid.client.handler;
+package org.apache.qpid.client.handler.amqp_8_0;
 
 import org.apache.log4j.Logger;
 import org.apache.qpid.AMQException;
-import org.apache.qpid.client.protocol.AMQProtocolSession;
 import org.apache.qpid.client.state.AMQStateManager;
 import org.apache.qpid.client.state.StateAwareMethodListener;
 import org.apache.qpid.protocol.AMQMethodEvent;
@@ -38,7 +37,7 @@
         return _instance;
     }
 
-    public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt) throws AMQException
+    public void methodReceived(AMQStateManager stateManager, AMQMethodEvent evt) throws AMQException
     {
         _logger.info("Received channel-close-ok for channel-id " + evt.getChannelId());
 

Propchange: incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/ChannelCloseOkMethodHandler.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/ChannelCloseOkMethodHandler.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Copied: incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/ChannelFlowOkMethodHandler.java (from r511387, incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ChannelFlowOkMethodHandler.java)
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/ChannelFlowOkMethodHandler.java?view=diff&rev=521253&p1=incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ChannelFlowOkMethodHandler.java&r1=511387&p2=incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/ChannelFlowOkMethodHandler.java&r2=521253
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ChannelFlowOkMethodHandler.java (original)
+++ incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/ChannelFlowOkMethodHandler.java Thu Mar 22 06:14:42 2007
@@ -18,11 +18,10 @@
  * under the License.
  *
  */
-package org.apache.qpid.client.handler;
+package org.apache.qpid.client.handler.amqp_8_0;
 
 import org.apache.log4j.Logger;
 import org.apache.qpid.AMQException;
-import org.apache.qpid.client.protocol.AMQProtocolSession;
 import org.apache.qpid.client.state.AMQStateManager;
 import org.apache.qpid.client.state.StateAwareMethodListener;
 import org.apache.qpid.framing.ChannelFlowOkBody;
@@ -42,9 +41,9 @@
      {
      }
 
-     public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt) throws AMQException
+     public void methodReceived(AMQStateManager stateManager, AMQMethodEvent evt) throws AMQException
      {
          ChannelFlowOkBody method = (ChannelFlowOkBody) evt.getMethod();
-         _logger.debug("Received Channel.Flow-Ok message, active = " + method.active);
+         _logger.debug("Received Channel.Flow-Ok message, active = " + method.getActive());
      }
 }

Propchange: incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/ChannelFlowOkMethodHandler.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/ChannelFlowOkMethodHandler.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Copied: incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/ConnectionCloseMethodHandler.java (from r511387, incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java)
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/ConnectionCloseMethodHandler.java?view=diff&rev=521253&p1=incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java&r1=511387&p2=incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/ConnectionCloseMethodHandler.java&r2=521253
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionCloseMethodHandler.java (original)
+++ incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/ConnectionCloseMethodHandler.java Thu Mar 22 06:14:42 2007
@@ -18,7 +18,7 @@
  * under the License.
  *
  */
-package org.apache.qpid.client.handler;
+package org.apache.qpid.client.handler.amqp_8_0;
 
 import org.apache.log4j.Logger;
 import org.apache.qpid.AMQConnectionClosedException;
@@ -31,6 +31,7 @@
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.ConnectionCloseBody;
 import org.apache.qpid.framing.ConnectionCloseOkBody;
+import org.apache.qpid.framing.amqp_8_0.ConnectionCloseOkBodyImpl;
 import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.protocol.AMQMethodEvent;
 
@@ -45,26 +46,31 @@
         return _handler;
     }
 
-    private ConnectionCloseMethodHandler()
+    protected ConnectionCloseMethodHandler()
     {
     }
 
-    public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt) throws AMQException
+    public void methodReceived(AMQStateManager stateManager, AMQMethodEvent evt) throws AMQException
     {
         _logger.info("ConnectionClose frame received");
+        final AMQProtocolSession protocolSession = stateManager.getProtocolSession();
         ConnectionCloseBody method = (ConnectionCloseBody) evt.getMethod();
 
         // does it matter
         //stateManager.changeState(AMQState.CONNECTION_CLOSING);
 
-        AMQConstant errorCode = AMQConstant.getConstant(method.replyCode);
-        AMQShortString reason = method.replyText;
+        AMQConstant errorCode = AMQConstant.getConstant(method.getReplyCode());
+        AMQShortString reason = method.getReplyText();
 
         try
         {
-            // TODO: check whether channel id of zero is appropriate
-            // Be aware of possible changes to parameter order as versions change.
-            protocolSession.writeFrame(ConnectionCloseOkBody.createAMQFrame((short) 0, method.getMajor(), method.getMinor()));
+
+
+
+            ConnectionCloseOkBody closeOkBody = createConnectionCloseOkBody();
+            protocolSession.getOutputHandler().sendCommand(0, closeOkBody);
+
+
 
             if (errorCode != AMQConstant.REPLY_SUCCESS)
             {
@@ -97,4 +103,10 @@
             stateManager.changeState(AMQState.CONNECTION_CLOSED);
         }
     }
+
+    protected ConnectionCloseOkBody createConnectionCloseOkBody()
+    {
+        return new ConnectionCloseOkBodyImpl();
+    }
+
 }

Propchange: incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/ConnectionCloseMethodHandler.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/ConnectionCloseMethodHandler.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Copied: incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/ConnectionOpenOkMethodHandler.java (from r511387, incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionOpenOkMethodHandler.java)
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/ConnectionOpenOkMethodHandler.java?view=diff&rev=521253&p1=incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionOpenOkMethodHandler.java&r1=511387&p2=incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/ConnectionOpenOkMethodHandler.java&r2=521253
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionOpenOkMethodHandler.java (original)
+++ incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/ConnectionOpenOkMethodHandler.java Thu Mar 22 06:14:42 2007
@@ -18,10 +18,9 @@
  * under the License.
  *
  */
-package org.apache.qpid.client.handler;
+package org.apache.qpid.client.handler.amqp_8_0;
 
 import org.apache.qpid.AMQException;
-import org.apache.qpid.client.protocol.AMQProtocolSession;
 import org.apache.qpid.client.state.AMQState;
 import org.apache.qpid.client.state.AMQStateManager;
 import org.apache.qpid.client.state.StateAwareMethodListener;
@@ -40,7 +39,7 @@
     {
     }
 
-    public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt) throws AMQException
+    public void methodReceived(AMQStateManager stateManager, AMQMethodEvent evt) throws AMQException
     {
         stateManager.changeState(AMQState.CONNECTION_OPEN);
     }

Propchange: incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/ConnectionOpenOkMethodHandler.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/ConnectionOpenOkMethodHandler.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Copied: incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/ConnectionRedirectMethodHandler.java (from r511387, incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionRedirectMethodHandler.java)
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/ConnectionRedirectMethodHandler.java?view=diff&rev=521253&p1=incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionRedirectMethodHandler.java&r1=511387&p2=incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/ConnectionRedirectMethodHandler.java&r2=521253
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionRedirectMethodHandler.java (original)
+++ incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/ConnectionRedirectMethodHandler.java Thu Mar 22 06:14:42 2007
@@ -18,7 +18,7 @@
  * under the License.
  *
  */
-package org.apache.qpid.client.handler;
+package org.apache.qpid.client.handler.amqp_8_0;
 
 import org.apache.log4j.Logger;
 import org.apache.qpid.AMQException;
@@ -45,12 +45,13 @@
     {
     }
 
-    public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt) throws AMQException
+    public void methodReceived(AMQStateManager stateManager, AMQMethodEvent evt) throws AMQException
     {
         _logger.info("ConnectionRedirect frame received");
+        final AMQProtocolSession protocolSession = stateManager.getProtocolSession();
         ConnectionRedirectBody method = (ConnectionRedirectBody) evt.getMethod();
 
-        String host = method.host.toString();
+        String host = method.getHost().toString();
         // the host is in the form hostname:port with the port being optional
         int portIndex = host.indexOf(':');
 

Propchange: incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/ConnectionRedirectMethodHandler.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/ConnectionRedirectMethodHandler.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Copied: incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/ConnectionSecureMethodHandler.java (from r511387, incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionSecureMethodHandler.java)
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/ConnectionSecureMethodHandler.java?view=diff&rev=521253&p1=incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionSecureMethodHandler.java&r1=511387&p2=incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/ConnectionSecureMethodHandler.java&r2=521253
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionSecureMethodHandler.java (original)
+++ incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/ConnectionSecureMethodHandler.java Thu Mar 22 06:14:42 2007
@@ -18,7 +18,7 @@
  * under the License.
  *
  */
-package org.apache.qpid.client.handler;
+package org.apache.qpid.client.handler.amqp_8_0;
 
 import javax.security.sasl.SaslClient;
 import javax.security.sasl.SaslException;
@@ -27,9 +27,9 @@
 import org.apache.qpid.client.protocol.AMQProtocolSession;
 import org.apache.qpid.client.state.AMQStateManager;
 import org.apache.qpid.client.state.StateAwareMethodListener;
-import org.apache.qpid.framing.AMQFrame;
 import org.apache.qpid.framing.ConnectionSecureBody;
 import org.apache.qpid.framing.ConnectionSecureOkBody;
+import org.apache.qpid.framing.amqp_8_0.ConnectionSecureOkBodyImpl;
 import org.apache.qpid.protocol.AMQMethodEvent;
 
 public class ConnectionSecureMethodHandler implements StateAwareMethodListener
@@ -41,8 +41,9 @@
         return _instance;
     }
 
-    public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt) throws AMQException
+    public void methodReceived(AMQStateManager stateManager, AMQMethodEvent evt) throws AMQException
     {
+        final AMQProtocolSession protocolSession = stateManager.getProtocolSession();
         SaslClient client = protocolSession.getSaslClient();
         if (client == null)
         {
@@ -54,14 +55,10 @@
         try
         {
             // Evaluate server challenge
-            byte[] response = client.evaluateChallenge(body.challenge);
-            // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
-            // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
-            // Be aware of possible changes to parameter order as versions change.
-            AMQFrame responseFrame = ConnectionSecureOkBody.createAMQFrame(evt.getChannelId(),
-                body.getMajor(), body.getMinor(),
-                response);	// response
-            protocolSession.writeFrame(responseFrame);
+            byte[] response = client.evaluateChallenge(body.getChallenge());
+
+            ConnectionSecureOkBody secureOkBody = createConnectionSecureOkBody(response);
+            protocolSession.getOutputHandler().sendCommand(evt.getChannelId(),secureOkBody);
         }
         catch (SaslException e)
         {
@@ -69,5 +66,10 @@
         }
 
 
+    }
+
+    protected ConnectionSecureOkBody createConnectionSecureOkBody(byte[] response)
+    {
+        return new ConnectionSecureOkBodyImpl(response);
     }
 }

Propchange: incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/ConnectionSecureMethodHandler.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/ConnectionSecureMethodHandler.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Copied: incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/ConnectionStartMethodHandler.java (from r511389, incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java)
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/ConnectionStartMethodHandler.java?view=diff&rev=521253&p1=incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java&r1=511389&p2=incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/ConnectionStartMethodHandler.java&r2=521253
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java (original)
+++ incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/ConnectionStartMethodHandler.java Thu Mar 22 06:14:42 2007
@@ -18,7 +18,7 @@
  * under the License.
  *
  */
-package org.apache.qpid.client.handler;
+package org.apache.qpid.client.handler.amqp_8_0;
 
 import java.io.UnsupportedEncodingException;
 import java.util.HashSet;
@@ -45,6 +45,7 @@
 import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.framing.FieldTableFactory;
 import org.apache.qpid.framing.ProtocolVersion;
+import org.apache.qpid.framing.amqp_8_0.ConnectionStartOkBodyImpl;
 import org.apache.qpid.protocol.AMQMethodEvent;
 
 public class ConnectionStartMethodHandler implements StateAwareMethodListener
@@ -61,15 +62,16 @@
     private ConnectionStartMethodHandler()
     { }
 
-    public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt)
+    public void methodReceived(AMQStateManager stateManager, AMQMethodEvent evt)
                         throws AMQException
     {
         _log.debug("public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, "
                    + "AMQMethodEvent evt): called");
 
+        final AMQProtocolSession protocolSession = stateManager.getProtocolSession();
         ConnectionStartBody body = (ConnectionStartBody) evt.getMethod();
 
-        ProtocolVersion pv = new ProtocolVersion((byte) body.versionMajor,(byte) body.versionMinor);
+        ProtocolVersion pv = new ProtocolVersion((byte) body.getVersionMajor(),(byte) body.getVersionMinor());
 
 
         // For the purposes of interop, we can make the client accept the broker's version string.
@@ -83,26 +85,26 @@
 
         if (pv.isSupported())
         {
-            protocolSession.setProtocolVersion(pv.getMajorVersion(), pv.getMinorVersion());
+            protocolSession.setProtocolVersion(pv);
 
             try
             {
                 // Used to hold the SASL mechanism to authenticate with.
                 String mechanism;
 
-                if (body.mechanisms == null)
+                if (body.getMechanisms() == null)
                 {
                     throw new AMQException("mechanism not specified in ConnectionStart method frame");
                 }
                 else
                 {
-                    mechanism = chooseMechanism(body.mechanisms);
+                    mechanism = chooseMechanism(body.getMechanisms());
                     _log.debug("mechanism = " + mechanism);
                 }
 
                 if (mechanism == null)
                 {
-                    throw new AMQException("No supported security mechanism found, passed: " + new String(body.mechanisms));
+                    throw new AMQException("No supported security mechanism found, passed: " + new String(body.getMechanisms()));
                 }
 
                 byte[] saslResponse;
@@ -128,12 +130,12 @@
                     throw new AMQException("Unable to create SASL client: " + e, e);
                 }
 
-                if (body.locales == null)
+                if (body.getLocales() == null)
                 {
                     throw new AMQException("Locales is not defined in Connection Start method");
                 }
 
-                final String locales = new String(body.locales, "utf8");
+                final String locales = new String(body.getLocales(), "utf8");
                 final StringTokenizer tokenizer = new StringTokenizer(locales, " ");
                 String selectedLocale = null;
                 if (tokenizer.hasMoreTokens())
@@ -148,24 +150,19 @@
                 stateManager.changeState(AMQState.CONNECTION_NOT_TUNED);
                 FieldTable clientProperties = FieldTableFactory.newFieldTable();
 
-                clientProperties.setString(new AMQShortString(ClientProperties.instance.toString()),
+                clientProperties.setString(ClientProperties.instance.getName(),
                                            protocolSession.getClientID());
-                clientProperties.setString(new AMQShortString(ClientProperties.product.toString()),
+                clientProperties.setString(ClientProperties.product.getName(),
                                            QpidProperties.getProductName());
-                clientProperties.setString(new AMQShortString(ClientProperties.version.toString()),
+                clientProperties.setString(ClientProperties.version.getName(),
                                            QpidProperties.getReleaseVersion());
-                clientProperties.setString(new AMQShortString(ClientProperties.platform.toString()), getFullSystemInfo());
+                clientProperties.setString(ClientProperties.platform.getName(), getFullSystemInfo());
 
-                // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
-                // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
-                // Be aware of possible changes to parameter order as versions change.
-                protocolSession.writeFrame(ConnectionStartOkBody.createAMQFrame(evt.getChannelId(),
-                                                                                protocolSession.getProtocolMajorVersion(),
-                                                                                protocolSession.getProtocolMinorVersion(),
-                                                                                clientProperties, // clientProperties
+                ConnectionStartOkBody startOkBody = createConnectionStartOkBody(clientProperties, // clientProperties
                                                                                 new AMQShortString(selectedLocale), // locale
                                                                                 new AMQShortString(mechanism), // mechanism
-                                                                                saslResponse)); // response
+                                                                                saslResponse); // response
+                protocolSession.getOutputHandler().sendCommand(0, startOkBody);
 
             }
             catch (UnsupportedEncodingException e)
@@ -175,14 +172,19 @@
         }
         else
         {
-            _log.error("Broker requested Protocol [" + body.versionMajor + "-" + body.versionMinor
+            _log.error("Broker requested Protocol [" + body.getVersionMajor() + "-" + body.getVersionMinor()
                        + "] which is not supported by this version of the client library");
 
             protocolSession.closeProtocolSession();
         }
     }
 
-  
+    private ConnectionStartOkBody createConnectionStartOkBody(FieldTable clientProperties, AMQShortString locale, AMQShortString mechanism, byte[] saslResponse)
+    {
+        return new ConnectionStartOkBodyImpl(clientProperties,mechanism,saslResponse,locale);
+    }
+
+
     private String getFullSystemInfo()
     {
         StringBuffer fullSystemInfo = new StringBuffer();

Propchange: incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/ConnectionStartMethodHandler.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/ConnectionStartMethodHandler.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Copied: incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/ConnectionTuneMethodHandler.java (from r511387, incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java)
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/ConnectionTuneMethodHandler.java?view=diff&rev=521253&p1=incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java&r1=511387&p2=incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/ConnectionTuneMethodHandler.java&r2=521253
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionTuneMethodHandler.java (original)
+++ incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/ConnectionTuneMethodHandler.java Thu Mar 22 06:14:42 2007
@@ -18,7 +18,7 @@
  * under the License.
  *
  */
-package org.apache.qpid.client.handler;
+package org.apache.qpid.client.handler.amqp_8_0;
 
 import org.apache.log4j.Logger;
 import org.apache.qpid.AMQException;
@@ -27,11 +27,12 @@
 import org.apache.qpid.client.state.AMQState;
 import org.apache.qpid.client.state.AMQStateManager;
 import org.apache.qpid.client.state.StateAwareMethodListener;
-import org.apache.qpid.framing.AMQFrame;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.ConnectionOpenBody;
 import org.apache.qpid.framing.ConnectionTuneBody;
 import org.apache.qpid.framing.ConnectionTuneOkBody;
+import org.apache.qpid.framing.amqp_8_0.ConnectionOpenBodyImpl;
+import org.apache.qpid.framing.amqp_8_0.ConnectionTuneOkBodyImpl;
 import org.apache.qpid.protocol.AMQMethodEvent;
 
 public class ConnectionTuneMethodHandler implements StateAwareMethodListener
@@ -49,9 +50,10 @@
     {
     }
 
-    public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt) throws AMQException
+    public void methodReceived(AMQStateManager stateManager, AMQMethodEvent evt) throws AMQException
     {
         _logger.debug("ConnectionTune frame received");
+        final AMQProtocolSession protocolSession = stateManager.getProtocolSession();
         ConnectionTuneBody frame = (ConnectionTuneBody) evt.getMethod();
 
         ConnectionTuneParameters params = protocolSession.getConnectionTuneParameters();
@@ -60,36 +62,36 @@
             params = new ConnectionTuneParameters();
         }
 
-        params.setFrameMax(frame.frameMax);        
-        params.setChannelMax(frame.channelMax);
-        params.setHeartbeat(Integer.getInteger("amqj.heartbeat.delay", frame.heartbeat));
+        params.setFrameMax(frame.getFrameMax());
+        params.setChannelMax(frame.getChannelMax());
+        params.setHeartbeat(Integer.getInteger("amqj.heartbeat.delay", frame.getHeartbeat()));
         protocolSession.setConnectionTuneParameters(params);
 
         stateManager.changeState(AMQState.CONNECTION_NOT_OPENED);
-        protocolSession.writeFrame(createTuneOkFrame(evt.getChannelId(), params,frame.getMajor(), frame.getMinor()));
+        protocolSession.getOutputHandler().sendCommand(evt.getChannelId(),
+                                                       createTuneOkBody(params));
 
         String host = protocolSession.getAMQConnection().getVirtualHost();
         AMQShortString virtualHost = new AMQShortString("/" + host);
 
 
-        protocolSession.writeFrame(createConnectionOpenFrame(evt.getChannelId(), virtualHost, null, true,frame.getMajor(), frame.getMinor()));
+        protocolSession.getOutputHandler().sendCommand(evt.getChannelId(),
+                                                       createConnectionOpenBody( virtualHost, null, true));
     }
 
-    protected AMQFrame createConnectionOpenFrame(int channel, AMQShortString path, AMQShortString capabilities, boolean insist, byte major, byte minor)
+    protected ConnectionOpenBody createConnectionOpenBody(AMQShortString path, AMQShortString capabilities, boolean insist)
     {
-        // Be aware of possible changes to parameter order as versions change.
-        return ConnectionOpenBody.createAMQFrame(channel,
-            major, minor,	// AMQP version (major, minor)
+
+        return new ConnectionOpenBodyImpl(path,// virtualHost
             capabilities,	// capabilities
-            insist,	// insist
-            path);	// virtualHost
+            insist);	// insist
+
     }
 
-    protected AMQFrame createTuneOkFrame(int channel, ConnectionTuneParameters params, byte major, byte minor)
+    protected ConnectionTuneOkBody createTuneOkBody(ConnectionTuneParameters params)
     {
         // Be aware of possible changes to parameter order as versions change.
-        return ConnectionTuneOkBody.createAMQFrame(channel,
-            major, minor,
+        return new ConnectionTuneOkBodyImpl(
             params.getChannelMax(),	// channelMax
             params.getFrameMax(),	// frameMax
             params.getHeartbeat());	// heartbeat

Propchange: incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/ConnectionTuneMethodHandler.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/ConnectionTuneMethodHandler.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Copied: incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/ExchangeBoundOkMethodHandler.java (from r511387, incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ExchangeBoundOkMethodHandler.java)
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/ExchangeBoundOkMethodHandler.java?view=diff&rev=521253&p1=incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ExchangeBoundOkMethodHandler.java&r1=511387&p2=incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/ExchangeBoundOkMethodHandler.java&r2=521253
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ExchangeBoundOkMethodHandler.java (original)
+++ incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/ExchangeBoundOkMethodHandler.java Thu Mar 22 06:14:42 2007
@@ -15,11 +15,10 @@
  * limitations under the License.
  *
  */
-package org.apache.qpid.client.handler;
+package org.apache.qpid.client.handler.amqp_8_0;
 
 import org.apache.log4j.Logger;
 import org.apache.qpid.AMQException;
-import org.apache.qpid.client.protocol.AMQProtocolSession;
 import org.apache.qpid.client.state.AMQStateManager;
 import org.apache.qpid.client.state.StateAwareMethodListener;
 import org.apache.qpid.framing.ExchangeBoundOkBody;
@@ -42,13 +41,13 @@
      {
      }
 
-     public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt) throws AMQException
+     public void methodReceived(AMQStateManager stateManager, AMQMethodEvent evt) throws AMQException
      {
          if (_logger.isDebugEnabled())
          {
             ExchangeBoundOkBody body = (ExchangeBoundOkBody) evt.getMethod();
-            _logger.debug("Received Exchange.Bound-Ok message, response code: " + body.replyCode + " text: " +
-                          body.replyText);
+            _logger.debug("Received Exchange.Bound-Ok message, response code: " + body.getReplyCode() + " text: " +
+                          body.getReplyText());
          }
      }
 }

Propchange: incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/ExchangeBoundOkMethodHandler.java
------------------------------------------------------------------------------
    svn:eol-style = native

Copied: incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/QueueDeleteOkMethodHandler.java (from r511387, incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/QueueDeleteOkMethodHandler.java)
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/QueueDeleteOkMethodHandler.java?view=diff&rev=521253&p1=incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/QueueDeleteOkMethodHandler.java&r1=511387&p2=incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/QueueDeleteOkMethodHandler.java&r2=521253
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/QueueDeleteOkMethodHandler.java (original)
+++ incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/QueueDeleteOkMethodHandler.java Thu Mar 22 06:14:42 2007
@@ -15,11 +15,10 @@
  * limitations under the License.
  *
  */
-package org.apache.qpid.client.handler;
+package org.apache.qpid.client.handler.amqp_8_0;
 
 import org.apache.log4j.Logger;
 import org.apache.qpid.AMQException;
-import org.apache.qpid.client.protocol.AMQProtocolSession;
 import org.apache.qpid.client.state.AMQStateManager;
 import org.apache.qpid.client.state.StateAwareMethodListener;
 import org.apache.qpid.framing.QueueDeleteOkBody;
@@ -42,12 +41,12 @@
      {
      }
 
-     public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt) throws AMQException
+     public void methodReceived(AMQStateManager stateManager, AMQMethodEvent evt) throws AMQException
      {
          if (_logger.isDebugEnabled())
          {
             QueueDeleteOkBody body = (QueueDeleteOkBody) evt.getMethod();
-            _logger.debug("Received Queue.Delete-Ok message, message count: " + body.messageCount);
+            _logger.debug("Received Queue.Delete-Ok message, message count: " + body.getMessageCount());
          }
      }
 }

Propchange: incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/handler/amqp_8_0/QueueDeleteOkMethodHandler.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java?view=auto&rev=521253
==============================================================================
--- incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java (added)
+++ incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java Thu Mar 22 06:14:42 2007
@@ -0,0 +1,36 @@
+package org.apache.qpid.client.protocol;
+
+import org.apache.qpid.framing.AMQDataBlock;
+import org.apache.qpid.framing.AMQFrame;
+import org.apache.qpid.framing.ProtocolVersion;
+import org.apache.qpid.protocol.AMQMethodEvent;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.state.AMQStateManager;
+
+/**
+ * Created by IntelliJ IDEA.
+ * User: U146758
+ * Date: 07-Mar-2007
+ * Time: 19:40:08
+ * To change this template use File | Settings | File Templates.
+ */
+public interface AMQProtocolHandler
+{
+    void writeFrame(AMQDataBlock frame);
+
+    void closeSession(AMQSession session) throws AMQException;
+
+    void closeConnection() throws AMQException;
+
+    AMQConnection getConnection();
+
+    AMQStateManager getStateManager();
+
+    AMQProtocolSession getProtocolSession();
+
+    ProtocolOutputHandler getOutputHandler();
+
+    ProtocolVersion getProtocolVersion();
+}

Copied: incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandlerImpl.java (from r511387, incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java)
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandlerImpl.java?view=diff&rev=521253&p1=incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java&r1=511387&p2=incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandlerImpl.java&r2=521253
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java (original)
+++ incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandlerImpl.java Thu Mar 22 06:14:42 2007
@@ -43,26 +43,16 @@
 import org.apache.qpid.client.state.AMQStateManager;
 import org.apache.qpid.client.state.listener.SpecificMethodFrameListener;
 import org.apache.qpid.codec.AMQCodecFactory;
-import org.apache.qpid.framing.AMQBody;
-import org.apache.qpid.framing.AMQDataBlock;
-import org.apache.qpid.framing.AMQFrame;
-import org.apache.qpid.framing.AMQMethodBody;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.ConnectionCloseBody;
-import org.apache.qpid.framing.ConnectionCloseOkBody;
-import org.apache.qpid.framing.ContentBody;
-import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.framing.HeartbeatBody;
+import org.apache.qpid.framing.*;
 import org.apache.qpid.pool.ReadWriteThreadModel;
-import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.protocol.AMQMethodEvent;
 import org.apache.qpid.protocol.AMQMethodListener;
 import org.apache.qpid.ssl.SSLContextFactory;
 
 
-public class AMQProtocolHandler extends IoHandlerAdapter
+public class AMQProtocolHandlerImpl extends IoHandlerAdapter implements AMQProtocolHandler
 {
-    private static final Logger _logger = Logger.getLogger(AMQProtocolHandler.class);
+    private static final Logger _logger = Logger.getLogger(AMQProtocolHandlerImpl.class);
 
     /**
      * The connection that this protocol handler is associated with. There is a 1-1 mapping between connection instances
@@ -75,7 +65,7 @@
 
     private AMQStateManager _stateManager = new AMQStateManager();
 
-    private final CopyOnWriteArraySet _frameListeners = new CopyOnWriteArraySet();
+
 
     /**
      * We create the failover handler when the session is created since it needs a reference to the IoSession in order
@@ -93,8 +83,9 @@
     private CountDownLatch _failoverLatch;
 
     private final long DEFAULT_SYNC_TIMEOUT = 1000 * 30;
+    private static final int CONTROL_CHANNEL = 0;
 
-    public AMQProtocolHandler(AMQConnection con)
+    public AMQProtocolHandlerImpl(AMQConnection con)
     {
         _connection = con;
     }
@@ -125,25 +116,21 @@
         }
 
 
-        try
-        {
 
-            ReadWriteThreadModel threadModel = ReadWriteThreadModel.getInstance();
-            threadModel.getAsynchronousReadFilter().createNewJobForSession(session);
-            threadModel.getAsynchronousWriteFilter().createNewJobForSession(session);
-        }
-        catch (RuntimeException e)
-        {
-            e.printStackTrace();
-        }
+        ReadWriteThreadModel threadModel = ReadWriteThreadModel.getInstance();
+        threadModel.getAsynchronousReadFilter().createNewJobForSession(session);
+        threadModel.getAsynchronousWriteFilter().createNewJobForSession(session);
+
 
         _protocolSession = new AMQProtocolSession(this, session, _connection, getStateManager());
+
+        // This starts the AMQP initiation by sending the AMQP Header
         _protocolSession.init();
     }
 
     public void sessionOpened(IoSession session) throws Exception
     {
-        //System.setProperty("foo", "bar");
+
     }
 
     /**
@@ -272,15 +259,8 @@
     public void propagateExceptionToWaiters(Exception e)
     {
         getStateManager().error(e);
-        if (!_frameListeners.isEmpty())
-        {
-            final Iterator it = _frameListeners.iterator();
-            while (it.hasNext())
-            {
-                final AMQMethodListener ml = (AMQMethodListener) it.next();
-                ml.error(e);
-            }
-        }
+        getProtocolSession().getOutputHandler().error(e);
+
     }
 
     private static int _messageReceivedCount;
@@ -290,7 +270,7 @@
         final boolean debug = _logger.isDebugEnabled();
         final long msgNumber = ++_messageReceivedCount;
 
-        if (debug && (msgNumber % 1000 == 0))
+        if (debug && (msgNumber % 1000 == CONTROL_CHANNEL))
         {
             _logger.debug("Received " + _messageReceivedCount + " protocol messages");
         }
@@ -303,45 +283,30 @@
 
         switch (bodyFrame.getFrameType())
         {
-            case AMQMethodBody.TYPE:
+            case AMQMethodBodyImpl.TYPE:
 
                 if (debug)
                 {
                     _logger.debug("(" + System.identityHashCode(this) + ")Method frame received: " + frame);
                 }
 
-                final AMQMethodEvent<AMQMethodBody> evt = new AMQMethodEvent<AMQMethodBody>(frame.getChannel(), (AMQMethodBody) bodyFrame);
+                final AMQMethodEvent<? extends AMQMethodBody> evt = new AMQMethodEvent<AMQMethodBody>(frame.getChannel(), (AMQMethodBody) bodyFrame);
 
                 try
                 {
 
                     boolean wasAnyoneInterested = getStateManager().methodReceived(evt);
-                    if (!_frameListeners.isEmpty())
-                    {
-                        Iterator it = _frameListeners.iterator();
-                        while (it.hasNext())
-                        {
-                            final AMQMethodListener listener = (AMQMethodListener) it.next();
-                            wasAnyoneInterested = listener.methodReceived(evt) || wasAnyoneInterested;
-                        }
-                    }
+                    wasAnyoneInterested = getProtocolSession().getOutputHandler().methodReceived(evt)  || wasAnyoneInterested;
+
                     if (!wasAnyoneInterested)
                     {
-                        throw new AMQException("AMQMethodEvent " + evt + " was not processed by any listener.  Listeners:" + _frameListeners);
+                        throw new AMQException("AMQMethodEvent " + evt + " was not processed by any listener.");
                     }
                 }
                 catch (AMQException e)
                 {
                     getStateManager().error(e);
-                    if (!_frameListeners.isEmpty())
-                    {
-                        Iterator it = _frameListeners.iterator();
-                        while (it.hasNext())
-                        {
-                            final AMQMethodListener listener = (AMQMethodListener) it.next();
-                            listener.error(e);
-                        }
-                    }
+                    getProtocolSession().getOutputHandler().error(e);
                     exceptionCaught(session, e);
                 }
                 break;
@@ -380,7 +345,7 @@
 
         final boolean debug = _logger.isDebugEnabled();
 
-        if (debug && (sentMessages % 1000 == 0))
+        if (debug && (sentMessages % 1000 == CONTROL_CHANNEL))
         {
             _logger.debug("Sent " + _messagesOut + " protocol messages");
         }
@@ -418,70 +383,7 @@
         _protocolSession.writeFrame(frame);
     }
 
-    public void writeFrame(AMQDataBlock frame, boolean wait)
-    {
-        _protocolSession.writeFrame(frame, wait);
-    }
-
-    /**
-     * Convenience method that writes a frame to the protocol session and waits for a particular response. Equivalent to
-     * calling getProtocolSession().write() then waiting for the response.
-     *
-     * @param frame
-     * @param listener the blocking listener. Note the calling thread will block.
-     */
-    public AMQMethodEvent writeCommandFrameAndWaitForReply(AMQFrame frame,
-                                                           BlockingMethodFrameListener listener)
-            throws AMQException
-    {
-        return writeCommandFrameAndWaitForReply(frame, listener, DEFAULT_SYNC_TIMEOUT);
-    }
-
-    /**
-     * Convenience method that writes a frame to the protocol session and waits for a particular response. Equivalent to
-     * calling getProtocolSession().write() then waiting for the response.
-     *
-     * @param frame
-     * @param listener the blocking listener. Note the calling thread will block.
-     */
-    public AMQMethodEvent writeCommandFrameAndWaitForReply(AMQFrame frame,
-                                                           BlockingMethodFrameListener listener, long timeout)
-            throws AMQException
-    {
-        try
-        {
-            _frameListeners.add(listener);
-            _protocolSession.writeFrame(frame);
-
-            AMQMethodEvent e = listener.blockForFrame(timeout);
-            return e;
-            // When control resumes before this line, a reply will have been received
-            // that matches the criteria defined in the blocking listener
-        }
-        catch (AMQException e)
-        {
-            throw e;
-        }
-        finally
-        {
-            // If we don't removeKey the listener then no-one will
-            _frameListeners.remove(listener);
-        }
-
-    }
-
-    /** More convenient method to write a frame and wait for it's response. */
-    public AMQMethodEvent syncWrite(AMQFrame frame, Class responseClass) throws AMQException
-    {
-        return syncWrite(frame, responseClass, DEFAULT_SYNC_TIMEOUT);
-    }
 
-    /** More convenient method to write a frame and wait for it's response. */
-    public AMQMethodEvent syncWrite(AMQFrame frame, Class responseClass, long timeout) throws AMQException
-    {
-        return writeCommandFrameAndWaitForReply(frame,
-                                                new SpecificMethodFrameListener(frame.getChannel(), responseClass), timeout);
-    }
 
     /**
      * Convenience method to register an AMQSession with the protocol handler. Registering a session with the protocol
@@ -519,20 +421,13 @@
     {
         getStateManager().changeState(AMQState.CONNECTION_CLOSING);
 
-        // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
-        // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
-        // Be aware of possible changes to parameter order as versions change.
-        final AMQFrame frame = ConnectionCloseBody.createAMQFrame(0,
-                                                                  _protocolSession.getProtocolMajorVersion(),
-                                                                  _protocolSession.getProtocolMinorVersion(),    // AMQP version (major, minor)
-                                                                  0,    // classId
-                                                                  0,    // methodId
-                                                                  AMQConstant.REPLY_SUCCESS.getCode(),    // replyCode
-                                                                  new AMQShortString("JMS client is closing the connection."));    // replyText
 
         try
         {
-            syncWrite(frame, ConnectionCloseOkBody.class, timeout);
+            ConnectionCloseBody closeBody = getAMQMethodFactory().createConnectionClose();
+            sendCommandReceiveResponse(CONTROL_CHANNEL,closeBody);
+
+
             _protocolSession.closeProtocolSession();
         }
         catch (AMQTimeoutException e)
@@ -543,6 +438,16 @@
 
     }
 
+    private void sendCommandReceiveResponse(int channelId, AMQMethodBody command) throws AMQException
+    {
+        getOutputHandler().sendCommandReceiveResponse(channelId, command);
+    }
+
+    private AMQMethodFactory getAMQMethodFactory()
+    {
+        return getOutputHandler().getAMQMethodFactory();
+    }
+
     /** @return the number of bytes read from this protocol session */
     public long getReadBytes()
     {
@@ -607,6 +512,11 @@
         return _protocolSession;
     }
 
+    public ProtocolOutputHandler getOutputHandler()
+    {
+        return getProtocolSession().getOutputHandler();
+    }
+
     FailoverState getFailoverState()
     {
         return _failoverState;
@@ -617,14 +527,11 @@
         _failoverState = failoverState;
     }
 
-    public byte getProtocolMajorVersion()
+    public ProtocolVersion getProtocolVersion()
     {
-        return _protocolSession.getProtocolMajorVersion();
+        return _protocolSession.getProtocolVersion();
     }
 
 
-    public byte getProtocolMinorVersion()
-    {
-        return _protocolSession.getProtocolMinorVersion();
-    }
+    
 }

Propchange: incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandlerImpl.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/branches/java.multi_version/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandlerImpl.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date