You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ai...@apache.org on 2008/02/08 11:10:11 UTC

svn commit: r619823 [12/19] - in /incubator/qpid/branches/thegreatmerge/qpid: ./ cpp/ dotnet/ dotnet/Qpid.Buffer.Tests/Properties/ dotnet/Qpid.Buffer/Properties/ dotnet/Qpid.Client.Tests/ dotnet/Qpid.Client.Tests/Channel/ dotnet/Qpid.Client.Tests/Commo...

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ExchangeBoundOkMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ExchangeBoundOkMethodHandler.java?rev=619823&r1=619822&r2=619823&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ExchangeBoundOkMethodHandler.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ExchangeBoundOkMethodHandler.java Fri Feb  8 02:09:37 2008
@@ -33,7 +33,7 @@
 /**
  * @author Apache Software Foundation
  */
-public class ExchangeBoundOkMethodHandler implements StateAwareMethodListener
+public class ExchangeBoundOkMethodHandler implements StateAwareMethodListener<ExchangeBoundOkBody>
 {
     private static final Logger _logger = LoggerFactory.getLogger(ExchangeBoundOkMethodHandler.class);
     private static final ExchangeBoundOkMethodHandler _instance = new ExchangeBoundOkMethodHandler();
@@ -46,14 +46,14 @@
     private ExchangeBoundOkMethodHandler()
     { }
 
-    public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt)
-        throws AMQException
+    public void methodReceived(AMQStateManager stateManager, ExchangeBoundOkBody body, int channelId)
+            throws AMQException
     {
         if (_logger.isDebugEnabled())
         {
-            ExchangeBoundOkBody body = (ExchangeBoundOkBody) evt.getMethod();
-            _logger.debug("Received Exchange.Bound-Ok message, response code: " + body.replyCode + " text: "
-                + body.replyText);
+            _logger.debug("Received Exchange.Bound-Ok message, response code: " + body.getReplyCode() + " text: "
+                + body.getReplyText());
         }
     }
+
 }

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/handler/QueueDeleteOkMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/handler/QueueDeleteOkMethodHandler.java?rev=619823&r1=619822&r2=619823&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/handler/QueueDeleteOkMethodHandler.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/handler/QueueDeleteOkMethodHandler.java Fri Feb  8 02:09:37 2008
@@ -33,7 +33,7 @@
 /**
  * @author Apache Software Foundation
  */
-public class QueueDeleteOkMethodHandler implements StateAwareMethodListener
+public class QueueDeleteOkMethodHandler implements StateAwareMethodListener<QueueDeleteOkBody>
 {
     private static final Logger _logger = LoggerFactory.getLogger(QueueDeleteOkMethodHandler.class);
     private static final QueueDeleteOkMethodHandler _instance = new QueueDeleteOkMethodHandler();
@@ -46,13 +46,14 @@
     private QueueDeleteOkMethodHandler()
     { }
 
-    public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt)
-        throws AMQException
-    {
+    public void methodReceived(AMQStateManager stateManager, QueueDeleteOkBody body, int channelId)
+            throws AMQException
+    {        
         if (_logger.isDebugEnabled())
         {
-            QueueDeleteOkBody body = (QueueDeleteOkBody) evt.getMethod();
-            _logger.debug("Received Queue.Delete-Ok message, message count: " + body.messageCount);
+            _logger.debug("Received Queue.Delete-Ok message, message count: " + body.getMessageCount());
         }
     }
+
+
 }

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_8.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_8.java?rev=619823&r1=619822&r2=619823&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_8.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_8.java Fri Feb  8 02:09:37 2008
@@ -26,6 +26,7 @@
 
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.BasicDeliverBody;
+import org.apache.qpid.framing.BasicReturnBody;
 import org.apache.qpid.framing.ContentBody;
 import org.apache.qpid.framing.ContentHeaderBody;
 
@@ -51,6 +52,18 @@
         super(channelId,deliveryId,consumerTag,exchange,routingKey,redelivered);
     }
 
+    public UnprocessedMessage_0_8(int channelId, BasicReturnBody body)
+    {
+        //FIXME: TGM, SRSLY 4RL
+        super(channelId, 0, null, body.getExchange(), body.getRoutingKey(), false);
+    }
+
+    public UnprocessedMessage_0_8(int channelId, BasicDeliverBody body)
+    {
+        //FIXME: TGM, SRSLY 4RL
+        super(channelId, 0, null, body.getExchange(), body.getRoutingKey(), false);
+    }
+
     public void receiveBody(ContentBody body)
     {
 
@@ -119,8 +132,8 @@
         }
         if(_deliverBody != null)
         {
-            buf.append("Delivery tag " + _deliverBody.deliveryTag);
-            buf.append("Consumer tag " + _deliverBody.consumerTag);
+            buf.append("Delivery tag " + _deliverBody.getDeliveryTag());
+            buf.append("Consumer tag " + _deliverBody.getConsumerTag());
             buf.append("Deliver Body " + _deliverBody);
         }
 

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java?rev=619823&r1=619822&r2=619823&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java Fri Feb  8 02:09:37 2008
@@ -21,10 +21,16 @@
 package org.apache.qpid.client.protocol;
 
 import org.apache.mina.common.IdleStatus;
+import org.apache.mina.common.IoFilterChain;
 import org.apache.mina.common.IoHandlerAdapter;
 import org.apache.mina.common.IoSession;
+import org.apache.mina.filter.ReadThrottleFilterBuilder;
 import org.apache.mina.filter.SSLFilter;
+import org.apache.mina.filter.WriteBufferLimitFilterBuilder;
+import org.apache.mina.filter.codec.ProtocolCodecException;
 import org.apache.mina.filter.codec.ProtocolCodecFilter;
+import org.apache.mina.filter.executor.ExecutorFilter;
+import org.apache.mina.transport.socket.nio.SocketSessionConfig;
 import org.apache.qpid.AMQConnectionClosedException;
 import org.apache.qpid.AMQDisconnectedException;
 import org.apache.qpid.AMQException;
@@ -39,16 +45,7 @@
 import org.apache.qpid.client.state.AMQStateManager;
 import org.apache.qpid.client.state.listener.SpecificMethodFrameListener;
 import org.apache.qpid.codec.AMQCodecFactory;
-import org.apache.qpid.framing.AMQBody;
-import org.apache.qpid.framing.AMQDataBlock;
-import org.apache.qpid.framing.AMQFrame;
-import org.apache.qpid.framing.AMQMethodBody;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.ConnectionCloseBody;
-import org.apache.qpid.framing.ConnectionCloseOkBody;
-import org.apache.qpid.framing.ContentBody;
-import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.framing.HeartbeatBody;
+import org.apache.qpid.framing.*;
 import org.apache.qpid.pool.ReadWriteThreadModel;
 import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.protocol.AMQMethodEvent;
@@ -58,6 +55,7 @@
 import org.slf4j.LoggerFactory;
 
 import java.util.Iterator;
+import java.util.Set;
 import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.CountDownLatch;
 
@@ -214,6 +212,36 @@
             e.printStackTrace();
         }
 
+        if (!System.getProperties().containsKey("protectio") || Boolean.getBoolean("protectio"))
+        {
+            try
+            {
+                //Add IO Protection Filters
+                IoFilterChain chain = session.getFilterChain();
+
+                int buf_size = 32768;
+                if (session.getConfig() instanceof SocketSessionConfig)
+                {
+                    buf_size = ((SocketSessionConfig) session.getConfig()).getReceiveBufferSize();
+                }
+                session.getFilterChain().addLast("tempExecutorFilterForFilterBuilder", new ExecutorFilter());
+
+                ReadThrottleFilterBuilder readfilter = new ReadThrottleFilterBuilder();
+                readfilter.setMaximumConnectionBufferSize(buf_size);
+                readfilter.attach(chain);
+
+                WriteBufferLimitFilterBuilder writefilter = new WriteBufferLimitFilterBuilder();
+                writefilter.setMaximumConnectionBufferSize(buf_size * 2);
+                writefilter.attach(chain);
+                session.getFilterChain().remove("tempExecutorFilterForFilterBuilder");
+
+                _logger.info("Using IO Read/Write Filter Protection");
+            }
+            catch (Exception e)
+            {
+                _logger.error("Unable to attach IO Read/Write Filter Protection :" + e.getMessage());
+            }
+        }
         _protocolSession = new AMQProtocolSession(this, session, _connection, getStateManager());
         _protocolSession.init();
     }
@@ -380,94 +408,109 @@
 
     public void messageReceived(IoSession session, Object message) throws Exception
     {
-        final boolean debug = _logger.isDebugEnabled();
-        final long msgNumber = ++_messageReceivedCount;
-
-        if (debug && ((msgNumber % 1000) == 0))
+        if(message instanceof AMQFrame)
         {
-            _logger.debug("Received " + _messageReceivedCount + " protocol messages");
-        }
+            final boolean debug = _logger.isDebugEnabled();
+            final long msgNumber = ++_messageReceivedCount;
 
-        AMQFrame frame = (AMQFrame) message;
+            if (debug && ((msgNumber % 1000) == 0))
+            {
+                _logger.debug("Received " + _messageReceivedCount + " protocol messages");
+            }
 
-        final AMQBody bodyFrame = frame.getBodyFrame();
+            AMQFrame frame = (AMQFrame) message;
 
-        HeartbeatDiagnostics.received(bodyFrame instanceof HeartbeatBody);
+            final AMQBody bodyFrame = frame.getBodyFrame();
 
-        switch (bodyFrame.getFrameType())
-        {
-            case AMQMethodBody.TYPE:
+            HeartbeatDiagnostics.received(bodyFrame instanceof HeartbeatBody);
 
-                if (debug)
-                {
-                    _logger.debug("(" + System.identityHashCode(this) + ")Method frame received: " + frame);
-                }
+            switch (bodyFrame.getFrameType())
+            {
+                case AMQMethodBody.TYPE:
 
-                final AMQMethodEvent<AMQMethodBody> evt =
-                        new AMQMethodEvent<AMQMethodBody>(frame.getChannel(), (AMQMethodBody) bodyFrame);
+                    if (debug)
+                    {
+                        _logger.debug("(" + System.identityHashCode(this) + ")Method frame received: " + frame);
+                    }
 
-                try
-                {
+                    final AMQMethodEvent<AMQMethodBody> evt =
+                            new AMQMethodEvent<AMQMethodBody>(frame.getChannel(), (AMQMethodBody) bodyFrame);
 
-                    boolean wasAnyoneInterested = getStateManager().methodReceived(evt);
-                    if (!_frameListeners.isEmpty())
+                    try
                     {
-                        Iterator it = _frameListeners.iterator();
-                        while (it.hasNext())
+
+                        boolean wasAnyoneInterested = getStateManager().methodReceived(evt);
+                        if (!_frameListeners.isEmpty())
                         {
-                            final AMQMethodListener listener = (AMQMethodListener) it.next();
-                            wasAnyoneInterested = listener.methodReceived(evt) || wasAnyoneInterested;
+                            Iterator it = _frameListeners.iterator();
+                            while (it.hasNext())
+                            {
+                                final AMQMethodListener listener = (AMQMethodListener) it.next();
+                                wasAnyoneInterested = listener.methodReceived(evt) || wasAnyoneInterested;
+                            }
                         }
-                    }
 
-                    if (!wasAnyoneInterested)
-                    {
-                        throw new AMQException(null, "AMQMethodEvent " + evt + " was not processed by any listener.  Listeners:"
-                                                     + _frameListeners, null);
+                        if (!wasAnyoneInterested)
+                        {
+                            throw new AMQException(null, "AMQMethodEvent " + evt + " was not processed by any listener.  Listeners:"
+                                                   + _frameListeners, null);
+                        }
                     }
-                }
-                catch (AMQException e)
-                {
-                    getStateManager().error(e);
-                    if (!_frameListeners.isEmpty())
+                    catch (AMQException e)
                     {
-                        Iterator it = _frameListeners.iterator();
-                        while (it.hasNext())
+                        getStateManager().error(e);
+                        if (!_frameListeners.isEmpty())
                         {
-                            final AMQMethodListener listener = (AMQMethodListener) it.next();
-                            listener.error(e);
+                            Iterator it = _frameListeners.iterator();
+                            while (it.hasNext())
+                            {
+                                final AMQMethodListener listener = (AMQMethodListener) it.next();
+                                listener.error(e);
+                            }
                         }
+
+                        exceptionCaught(session, e);
                     }
 
-                    exceptionCaught(session, e);
-                }
+                    break;
 
-                break;
+                case ContentHeaderBody.TYPE:
 
-            case ContentHeaderBody.TYPE:
+                    _protocolSession.messageContentHeaderReceived(frame.getChannel(), (ContentHeaderBody) bodyFrame);
+                    break;
 
-                _protocolSession.messageContentHeaderReceived(frame.getChannel(), (ContentHeaderBody) bodyFrame);
-                break;
+                case ContentBody.TYPE:
 
-            case ContentBody.TYPE:
+                    _protocolSession.messageContentBodyReceived(frame.getChannel(), (ContentBody) bodyFrame);
+                    break;
 
-                _protocolSession.messageContentBodyReceived(frame.getChannel(), (ContentBody) bodyFrame);
-                break;
+                case HeartbeatBody.TYPE:
 
-            case HeartbeatBody.TYPE:
+                    if (debug)
+                    {
+                        _logger.debug("Received heartbeat");
+                    }
 
-                if (debug)
-                {
-                    _logger.debug("Received heartbeat");
-                }
+                    break;
 
-                break;
+                default:
 
-            default:
+            }
 
+            _connection.bytesReceived(_protocolSession.getIoSession().getReadBytes());
         }
+        else if (message instanceof ProtocolInitiation)
+        {
+            // We get here if the server sends a response to our initial protocol header
+            // suggesting an alternate ProtocolVersion; the server will then close the
+            // connection.
+            ProtocolInitiation protocolInit = (ProtocolInitiation) message;
+            ProtocolVersion pv = protocolInit.checkVersion();
+            getConnection().setProtocolVersion(pv);
 
-        _connection.bytesReceived(_protocolSession.getIoSession().getReadBytes());
+            // get round a bug in old versions of qpid whereby the connection is not closed
+            _stateManager.changeState(AMQState.CONNECTION_CLOSED);
+        }
     }
 
     private static int _messagesOut;
@@ -506,6 +549,12 @@
         getStateManager().attainState(s);
     }
 
+    public AMQState attainState(Set<AMQState> states) throws AMQException
+    {
+        return getStateManager().attainState(states);
+    }
+
+
     /**
      * Convenience method that writes a frame to the protocol session. Equivalent to calling
      * getProtocolSession().write().
@@ -600,16 +649,11 @@
     {
         getStateManager().changeState(AMQState.CONNECTION_CLOSING);
 
-        // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
-        // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
-        // Be aware of possible changes to parameter order as versions change.
-        final AMQFrame frame =
-                ConnectionCloseBody.createAMQFrame(0, _protocolSession.getProtocolMajorVersion(),
-                                                   _protocolSession.getProtocolMinorVersion(), // AMQP version (major, minor)
-                                                   0, // classId
-                                                   0, // methodId
-                                                   AMQConstant.REPLY_SUCCESS.getCode(), // replyCode
-                                                   new AMQShortString("JMS client is closing the connection.")); // replyText
+        ConnectionCloseBody body = _protocolSession.getMethodRegistry().createConnectionCloseBody(AMQConstant.REPLY_SUCCESS.getCode(), // replyCode
+                new AMQShortString("JMS client is closing the connection."),0,0);
+
+
+        final AMQFrame frame = body.generateFrame(0);
 
         try
         {
@@ -708,5 +752,15 @@
     public byte getProtocolMinorVersion()
     {
         return _protocolSession.getProtocolMinorVersion();
+    }
+
+    public MethodRegistry getMethodRegistry()
+    {
+        return getStateManager().getMethodRegistry();
+    }
+
+    public ProtocolVersion getProtocolVersion()
+    {
+        return _protocolSession.getProtocolVersion();
     }
 }

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java?rev=619823&r1=619822&r2=619823&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java Fri Feb  8 02:09:37 2008
@@ -21,40 +21,31 @@
 package org.apache.qpid.client.protocol;
 
 import org.apache.commons.lang.StringUtils;
-
 import org.apache.mina.common.CloseFuture;
 import org.apache.mina.common.IdleStatus;
 import org.apache.mina.common.IoSession;
 import org.apache.mina.common.WriteFuture;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.JMSException;
+import javax.security.sasl.SaslClient;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 
 import org.apache.qpid.AMQException;
 import org.apache.qpid.client.AMQConnection;
 import org.apache.qpid.client.AMQSession;
 import org.apache.qpid.client.ConnectionTuneParameters;
-// import org.apache.qpid.client.message.UnexpectedBodyReceivedException;
 import org.apache.qpid.client.message.ReturnMessage;
 import org.apache.qpid.client.message.UnprocessedMessage;
 import org.apache.qpid.client.message.UnprocessedMessage_0_8;
 import org.apache.qpid.client.state.AMQStateManager;
-import org.apache.qpid.framing.AMQDataBlock;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.ContentBody;
-import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.framing.MainRegistry;
-import org.apache.qpid.framing.ProtocolInitiation;
-import org.apache.qpid.framing.ProtocolVersion;
-import org.apache.qpid.framing.VersionSpecificRegistry;
+import org.apache.qpid.framing.*;
 import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.jms.JMSException;
-import javax.security.sasl.SaslClient;
-
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
+import org.apache.qpid.client.handler.ClientMethodDispatcherImpl;
 
 /**
  * Wrapper for protocol session that provides type-safe access to session attributes. <p/> The underlying protocol
@@ -101,12 +92,19 @@
     protected int _queueId = 1;
     protected final Object _queueIdLock = new Object();
 
-    private byte _protocolMinorVersion;
-    private byte _protocolMajorVersion;
-    private VersionSpecificRegistry _registry =
-        MainRegistry.getVersionSpecificRegistry(ProtocolVersion.getLatestSupportedVersion());
+    private ProtocolVersion _protocolVersion;
+//    private VersionSpecificRegistry _registry =
+//        MainRegistry.getVersionSpecificRegistry(ProtocolVersion.getLatestSupportedVersion());
+
+
+    private MethodRegistry _methodRegistry =
+            MethodRegistry.getMethodRegistry(ProtocolVersion.getLatestSupportedVersion());
 
-    private final AMQConnection _connection;
+
+    private MethodDispatcher _methodDispatcher;
+
+
+    private final AMQConnection _connection;    
 
     public AMQProtocolSession(AMQProtocolHandler protocolHandler, IoSession protocolSession, AMQConnection connection)
     {
@@ -126,6 +124,9 @@
         _minaProtocolSession.setWriteTimeout(LAST_WRITE_FUTURE_JOIN_TIMEOUT);
         _stateManager = stateManager;
         _stateManager.setProtocolSession(this);
+        _protocolVersion = connection.getProtocolVersion();
+        _methodDispatcher = ClientMethodDispatcherImpl.newMethodDispatcher(ProtocolVersion.getLatestSupportedVersion(),
+                                                                 stateManager);
         _connection = connection;
 
     }
@@ -135,7 +136,7 @@
         // start the process of setting up the connection. This is the first place that
         // data is written to the server.
 
-        _minaProtocolSession.write(new ProtocolInitiation(ProtocolVersion.getLatestSupportedVersion()));
+        _minaProtocolSession.write(new ProtocolInitiation(_connection.getProtocolVersion()));
     }
 
     public String getClientID()
@@ -164,6 +165,8 @@
     public void setStateManager(AMQStateManager stateManager)
     {
         _stateManager = stateManager;
+        _methodDispatcher = ClientMethodDispatcherImpl.newMethodDispatcher(_protocolVersion,
+                                                                 stateManager);         
     }
 
     public String getVirtualHost()
@@ -440,26 +443,55 @@
         session.confirmConsumerCancelled(consumerTag);
     }
 
-    public void setProtocolVersion(final byte versionMajor, final byte versionMinor)
+    public void setProtocolVersion(final ProtocolVersion pv)
     {
-        _protocolMajorVersion = versionMajor;
-        _protocolMinorVersion = versionMinor;
-        _registry = MainRegistry.getVersionSpecificRegistry(versionMajor, versionMinor);
+        _protocolVersion = pv;
+        _methodRegistry = MethodRegistry.getMethodRegistry(pv);
+        _methodDispatcher = ClientMethodDispatcherImpl.newMethodDispatcher(pv, _stateManager);
+
+      //  _registry = MainRegistry.getVersionSpecificRegistry(versionMajor, versionMinor);
     }
 
     public byte getProtocolMinorVersion()
     {
-        return _protocolMinorVersion;
+        return _protocolVersion.getMinorVersion();
     }
 
     public byte getProtocolMajorVersion()
     {
-        return _protocolMajorVersion;
+        return _protocolVersion.getMajorVersion();
+    }
+
+    public ProtocolVersion getProtocolVersion()
+    {
+        return _protocolVersion;
     }
 
-    public VersionSpecificRegistry getRegistry()
+//    public VersionSpecificRegistry getRegistry()
+//    {
+//        return _registry;
+//    }
+
+    public MethodRegistry getMethodRegistry()
     {
-        return _registry;
+        return _methodRegistry;
     }
 
+    public MethodDispatcher getMethodDispatcher()
+    {
+        return _methodDispatcher;
+    }
+
+
+    public void setTicket(int ticket, int channelId)
+    {
+        final AMQSession session = getSession(channelId);
+        session.setTicket(ticket);
+    }
+
+
+    public void setMethodDispatcher(MethodDispatcher methodDispatcher)
+    {
+        _methodDispatcher = methodDispatcher;
+    }
 }

Copied: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQMethodNotImplementedException.java (from r616809, incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/state/AMQMethodNotImplementedException.java)
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQMethodNotImplementedException.java?p2=incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQMethodNotImplementedException.java&p1=incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/state/AMQMethodNotImplementedException.java&r1=616809&r2=619823&rev=619823&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/state/AMQMethodNotImplementedException.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQMethodNotImplementedException.java Fri Feb  8 02:09:37 2008
@@ -27,6 +27,6 @@
 {
     public AMQMethodNotImplementedException(AMQMethodBody body)
     {
-        super("Unexpected Method Received: " + body.getClass().getName());
+        super(null, "Unexpected Method Received: " + body.getClass().getName(), null);
     }
 }

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQState.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQState.java?rev=619823&r1=619822&r2=619823&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQState.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQState.java Fri Feb  8 02:09:37 2008
@@ -24,8 +24,22 @@
  * States used in the AMQ protocol. Used by the finite state machine to determine
  * valid responses.
  */
-public class AMQState
+public enum AMQState
 {
+
+    CONNECTION_NOT_STARTED(1, "CONNECTION_NOT_STARTED"),
+
+    CONNECTION_NOT_TUNED(2, "CONNECTION_NOT_TUNED"),
+
+    CONNECTION_NOT_OPENED(3, "CONNECTION_NOT_OPENED"),
+
+    CONNECTION_OPEN(4, "CONNECTION_OPEN"),
+
+    CONNECTION_CLOSING(5, "CONNECTION_CLOSING"),
+
+    CONNECTION_CLOSED(6, "CONNECTION_CLOSED");
+
+
     private final int _id;
 
     private final String _name;
@@ -41,16 +55,6 @@
         return "AMQState: id = " + _id + " name: " + _name;
     }
 
-    public static final AMQState CONNECTION_NOT_STARTED = new AMQState(1, "CONNECTION_NOT_STARTED");
-    
-    public static final AMQState CONNECTION_NOT_TUNED = new AMQState(2, "CONNECTION_NOT_TUNED");
-    
-    public static final AMQState CONNECTION_NOT_OPENED = new AMQState(3, "CONNECTION_NOT_OPENED");        
-
-    public static final AMQState CONNECTION_OPEN = new AMQState(4, "CONNECTION_OPEN");
-
-    public static final AMQState CONNECTION_CLOSING = new AMQState(5, "CONNECTION_CLOSING");
-    
-    public static final AMQState CONNECTION_CLOSED = new AMQState(6, "CONNECTION_CLOSED");
-    
+
+
 }

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java?rev=619823&r1=619822&r2=619823&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java Fri Feb  8 02:09:37 2008
@@ -21,42 +21,15 @@
 package org.apache.qpid.client.state;
 
 import org.apache.qpid.AMQException;
-import org.apache.qpid.client.handler.BasicCancelOkMethodHandler;
-import org.apache.qpid.client.handler.BasicDeliverMethodHandler;
-import org.apache.qpid.client.handler.BasicReturnMethodHandler;
-import org.apache.qpid.client.handler.ChannelCloseMethodHandler;
-import org.apache.qpid.client.handler.ChannelCloseOkMethodHandler;
-import org.apache.qpid.client.handler.ChannelFlowOkMethodHandler;
-import org.apache.qpid.client.handler.ConnectionCloseMethodHandler;
-import org.apache.qpid.client.handler.ConnectionOpenOkMethodHandler;
-import org.apache.qpid.client.handler.ConnectionSecureMethodHandler;
-import org.apache.qpid.client.handler.ConnectionStartMethodHandler;
-import org.apache.qpid.client.handler.ConnectionTuneMethodHandler;
-import org.apache.qpid.client.handler.ExchangeBoundOkMethodHandler;
-import org.apache.qpid.client.handler.QueueDeleteOkMethodHandler;
 import org.apache.qpid.client.protocol.AMQProtocolSession;
-import org.apache.qpid.framing.AMQMethodBody;
-import org.apache.qpid.framing.BasicCancelOkBody;
-import org.apache.qpid.framing.BasicDeliverBody;
-import org.apache.qpid.framing.BasicReturnBody;
-import org.apache.qpid.framing.ChannelCloseBody;
-import org.apache.qpid.framing.ChannelCloseOkBody;
-import org.apache.qpid.framing.ChannelFlowOkBody;
-import org.apache.qpid.framing.ConnectionCloseBody;
-import org.apache.qpid.framing.ConnectionOpenOkBody;
-import org.apache.qpid.framing.ConnectionSecureBody;
-import org.apache.qpid.framing.ConnectionStartBody;
-import org.apache.qpid.framing.ConnectionTuneBody;
-import org.apache.qpid.framing.ExchangeBoundOkBody;
-import org.apache.qpid.framing.QueueDeleteOkBody;
+import org.apache.qpid.framing.*;
 import org.apache.qpid.protocol.AMQMethodEvent;
 import org.apache.qpid.protocol.AMQMethodListener;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.HashMap;
 import java.util.Iterator;
-import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.CopyOnWriteArraySet;
 
 /**
@@ -72,11 +45,11 @@
     /** The current state */
     private AMQState _currentState;
 
+
     /**
      * Maps from an AMQState instance to a Map from Class to StateTransitionHandler. The class must be a subclass of
      * AMQFrame.
      */
-    protected final Map _state2HandlersMap = new HashMap();
 
     private final CopyOnWriteArraySet _stateListeners = new CopyOnWriteArraySet();
     private final Object _stateLock = new Object();
@@ -96,53 +69,10 @@
     {
         _protocolSession = protocolSession;
         _currentState = state;
-        if (register)
-        {
-            registerListeners();
-        }
+
     }
 
-    protected void registerListeners()
-    {
-        Map frame2handlerMap = new HashMap();
 
-        // we need to register a map for the null (i.e. all state) handlers otherwise you get
-        // a stack overflow in the handler searching code when you present it with a frame for which
-        // no handlers are registered
-        //
-        _state2HandlersMap.put(null, frame2handlerMap);
-
-        frame2handlerMap = new HashMap();
-        frame2handlerMap.put(ConnectionStartBody.class, ConnectionStartMethodHandler.getInstance());
-        frame2handlerMap.put(ConnectionCloseBody.class, ConnectionCloseMethodHandler.getInstance());
-        _state2HandlersMap.put(AMQState.CONNECTION_NOT_STARTED, frame2handlerMap);
-
-        frame2handlerMap = new HashMap();
-        frame2handlerMap.put(ConnectionTuneBody.class, ConnectionTuneMethodHandler.getInstance());
-        frame2handlerMap.put(ConnectionSecureBody.class, ConnectionSecureMethodHandler.getInstance());
-        frame2handlerMap.put(ConnectionCloseBody.class, ConnectionCloseMethodHandler.getInstance());
-        _state2HandlersMap.put(AMQState.CONNECTION_NOT_TUNED, frame2handlerMap);
-
-        frame2handlerMap = new HashMap();
-        frame2handlerMap.put(ConnectionOpenOkBody.class, ConnectionOpenOkMethodHandler.getInstance());
-        frame2handlerMap.put(ConnectionCloseBody.class, ConnectionCloseMethodHandler.getInstance());
-        _state2HandlersMap.put(AMQState.CONNECTION_NOT_OPENED, frame2handlerMap);
-
-        //
-        // ConnectionOpen handlers
-        //
-        frame2handlerMap = new HashMap();
-        frame2handlerMap.put(ChannelCloseBody.class, ChannelCloseMethodHandler.getInstance());
-        frame2handlerMap.put(ChannelCloseOkBody.class, ChannelCloseOkMethodHandler.getInstance());
-        frame2handlerMap.put(ConnectionCloseBody.class, ConnectionCloseMethodHandler.getInstance());
-        frame2handlerMap.put(BasicDeliverBody.class, BasicDeliverMethodHandler.getInstance());
-        frame2handlerMap.put(BasicReturnBody.class, BasicReturnMethodHandler.getInstance());
-        frame2handlerMap.put(BasicCancelOkBody.class, BasicCancelOkMethodHandler.getInstance());
-        frame2handlerMap.put(ChannelFlowOkBody.class, ChannelFlowOkMethodHandler.getInstance());
-        frame2handlerMap.put(QueueDeleteOkBody.class, QueueDeleteOkMethodHandler.getInstance());
-        frame2handlerMap.put(ExchangeBoundOkBody.class, ExchangeBoundOkMethodHandler.getInstance());
-        _state2HandlersMap.put(AMQState.CONNECTION_OPEN, frame2handlerMap);
-    }
 
     public AMQState getCurrentState()
     {
@@ -176,56 +106,14 @@
 
     public <B extends AMQMethodBody> boolean methodReceived(AMQMethodEvent<B> evt) throws AMQException
     {
-        StateAwareMethodListener handler = findStateTransitionHandler(_currentState, evt.getMethod());
-        if (handler != null)
-        {
-            handler.methodReceived(this, _protocolSession, evt);
 
-            return true;
-        }
-
-        return false;
+        B method = evt.getMethod();
+        
+        //    StateAwareMethodListener handler = findStateTransitionHandler(_currentState, evt.getMethod());
+        method.execute(_protocolSession.getMethodDispatcher(), evt.getChannelId());
+        return true;
     }
 
-    protected StateAwareMethodListener findStateTransitionHandler(AMQState currentState, AMQMethodBody frame)
-    // throws IllegalStateTransitionException
-    {
-        final Class clazz = frame.getClass();
-        if (_logger.isDebugEnabled())
-        {
-            _logger.debug("Looking for state[" + currentState + "] transition handler for frame " + clazz);
-        }
-
-        final Map classToHandlerMap = (Map) _state2HandlersMap.get(currentState);
-
-        if (classToHandlerMap == null)
-        {
-            // if no specialised per state handler is registered look for a
-            // handler registered for "all" states
-            return findStateTransitionHandler(null, frame);
-        }
-
-        final StateAwareMethodListener handler = (StateAwareMethodListener) classToHandlerMap.get(clazz);
-        if (handler == null)
-        {
-            if (currentState == null)
-            {
-                _logger.debug("No state transition handler defined for receiving frame " + frame);
-
-                return null;
-            }
-            else
-            {
-                // if no specialised per state handler is registered look for a
-                // handler registered for "all" states
-                return findStateTransitionHandler(null, frame);
-            }
-        }
-        else
-        {
-            return handler;
-        }
-    }
 
     public void attainState(final AMQState s) throws AMQException
     {
@@ -271,5 +159,47 @@
     public void setProtocolSession(AMQProtocolSession session)
     {
         _protocolSession = session;
+    }
+
+    public MethodRegistry getMethodRegistry()
+    {
+        return getProtocolSession().getMethodRegistry();
+    }
+
+    public AMQState attainState(Set<AMQState> stateSet) throws AMQException
+    {
+        synchronized (_stateLock)
+        {
+            final long waitUntilTime = System.currentTimeMillis() + MAXIMUM_STATE_WAIT_TIME;
+            long waitTime = MAXIMUM_STATE_WAIT_TIME;
+
+            while (!stateSet.contains(_currentState) && (waitTime > 0))
+            {
+                try
+                {
+                    _stateLock.wait(MAXIMUM_STATE_WAIT_TIME);
+                }
+                catch (InterruptedException e)
+                {
+                    _logger.warn("Thread interrupted");
+                }
+
+                if (!stateSet.contains(_currentState))
+                {
+                    waitTime = waitUntilTime - System.currentTimeMillis();
+                }
+            }
+
+            if (!stateSet.contains(_currentState))
+            {
+                _logger.warn("State not achieved within permitted time.  Current state " + _currentState
+                             + ", desired state: " + stateSet);
+                throw new AMQException(null, "State not achieved within permitted time.  Current state " + _currentState
+                                       + ", desired state: " + stateSet, null);
+            }
+            return _currentState;
+        }
+
+
     }
 }

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/state/StateAwareMethodListener.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/state/StateAwareMethodListener.java?rev=619823&r1=619822&r2=619823&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/state/StateAwareMethodListener.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/state/StateAwareMethodListener.java Fri Feb  8 02:09:37 2008
@@ -21,6 +21,7 @@
 package org.apache.qpid.client.state;
 
 import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQMethodBody;
 import org.apache.qpid.client.protocol.AMQProtocolSession;
 import org.apache.qpid.protocol.AMQMethodEvent;
 
@@ -29,8 +30,9 @@
  * the opportunity to update state.
  *
  */
-public interface StateAwareMethodListener
+public interface StateAwareMethodListener<B extends AMQMethodBody>
 {
-    void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession,
-        AMQMethodEvent evt) throws AMQException;
+
+    void methodReceived(AMQStateManager stateManager, B body, int channelId) throws AMQException;
+
 }

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java?rev=619823&r1=619822&r2=619823&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java Fri Feb  8 02:09:37 2008
@@ -23,6 +23,7 @@
 import org.apache.mina.common.IoConnector;
 import org.apache.mina.common.IoHandlerAdapter;
 import org.apache.mina.common.IoServiceConfig;
+import org.apache.mina.transport.socket.nio.MultiThreadSocketConnector;
 import org.apache.mina.transport.socket.nio.SocketConnector;
 import org.apache.mina.transport.vmpipe.VmPipeAcceptor;
 import org.apache.mina.transport.vmpipe.VmPipeAddress;
@@ -34,7 +35,6 @@
 
 import java.io.IOException;
 import java.util.HashMap;
-import java.util.Iterator;
 import java.util.Map;
 
 /**
@@ -95,28 +95,26 @@
                     {
                         SocketConnector result;
                         // FIXME - this needs to be sorted to use the new Mina MultiThread SA.
-                        if (Boolean.getBoolean("qpidnio"))
+                        if (!System.getProperties().containsKey("qpidnio") || Boolean.getBoolean("qpidnio"))
                         {
-                            _logger.error("Using Qpid NIO - sysproperty 'qpidnio' is set.");
-                            // result = new org.apache.qpid.nio.SocketConnector(); // non-blocking connector
+                            _logger.warn("Using Qpid MultiThreaded NIO - " + (System.getProperties().containsKey("qpidnio")
+                                                                 ? "Qpid NIO is new default"
+                                                                 : "Sysproperty 'qpidnio' is set"));
+                            result = new MultiThreadSocketConnector();
                         }
-                        // else
-
+                        else
                         {
                             _logger.info("Using Mina NIO");
                             result = new SocketConnector(); // non-blocking connector
                         }
-
                         // Don't have the connector's worker thread wait around for other connections (we only use
                         // one SocketConnector per connection at the moment anyway). This allows short-running
                         // clients (like unit tests) to complete quickly.
                         result.setWorkerTimeout(0);
-
                         return result;
                     }
                 });
                 break;
-
             case VM:
             {
                 _instance = getVMTransport(details, Boolean.getBoolean("amqj.AutoCreateVMBroker"));
@@ -151,7 +149,15 @@
         {
             if (AutoCreate)
             {
-                createVMBroker(port);
+                if (AutoCreate)
+                {
+                    createVMBroker(port);
+                }
+                else
+                {
+                    throw new AMQVMBrokerCreationException(null, port, "VM Broker on port " + port
+                                                                       + " does not exist. Auto create disabled.", null);
+                }
             }
             else
             {
@@ -271,8 +277,7 @@
             }
 
             AMQVMBrokerCreationException amqbce =
-                    new AMQVMBrokerCreationException(null, port, because + " Stopped InVM Qpid.AMQP creation", null);
-            amqbce.initCause(e);
+                    new AMQVMBrokerCreationException(null, port, because + " Stopped InVM Qpid.AMQP creation", e);
             throw amqbce;
         }
 
@@ -282,16 +287,17 @@
     public static void killAllVMBrokers()
     {
         _logger.info("Killing all VM Brokers");
-        _acceptor.unbindAll();
-
-        Iterator keys = _inVmPipeAddress.keySet().iterator();
-
-        while (keys.hasNext())
+        if (_acceptor != null)
         {
-            int id = (Integer) keys.next();
-            _inVmPipeAddress.remove(id);
+        	_acceptor.unbindAll();
         }
-
+        synchronized (_inVmPipeAddress)
+        {
+            _inVmPipeAddress.clear();
+        }        
+        _acceptor = null;
+        _currentInstance = -1;
+        _currentVMPort = -1;
     }
 
     public static void killVMBroker(int port)

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/transport/VmPipeTransportConnection.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/transport/VmPipeTransportConnection.java?rev=619823&r1=619822&r2=619823&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/transport/VmPipeTransportConnection.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/transport/VmPipeTransportConnection.java Fri Feb  8 02:09:37 2008
@@ -22,15 +22,12 @@
 
 import org.apache.mina.common.ConnectFuture;
 import org.apache.mina.common.IoServiceConfig;
+import org.apache.mina.transport.vmpipe.QpidVmPipeConnector;
 import org.apache.mina.transport.vmpipe.VmPipeAddress;
 import org.apache.mina.transport.vmpipe.VmPipeConnector;
-
 import org.apache.qpid.client.protocol.AMQProtocolHandler;
 import org.apache.qpid.jms.BrokerDetails;
-import org.apache.qpid.pool.PoolingFilter;
-import org.apache.qpid.pool.ReferenceCountingExecutorService;
 import org.apache.qpid.pool.ReadWriteThreadModel;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -49,10 +46,10 @@
 
     public void connect(AMQProtocolHandler protocolHandler, BrokerDetails brokerDetail) throws IOException
     {
-        final VmPipeConnector ioConnector = new VmPipeConnector();
+        final VmPipeConnector ioConnector = new QpidVmPipeConnector();
         final IoServiceConfig cfg = ioConnector.getDefaultConfig();
 
-        cfg.setThreadModel(ReadWriteThreadModel.getInstance());             
+        cfg.setThreadModel(ReadWriteThreadModel.getInstance());
 
         final VmPipeAddress address = new VmPipeAddress(_port);
         _logger.info("Attempting connection to " + address);

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java?rev=619823&r1=619822&r2=619823&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java Fri Feb  8 02:09:37 2008
@@ -33,6 +33,7 @@
     */
     public static final String OPTIONS_RETRY = "retries";
     public static final String OPTIONS_CONNECT_TIMEOUT = "connecttimeout";
+    public static final String OPTIONS_CONNECT_DELAY = "connectdelay";
     public static final int DEFAULT_PORT = 5672;
 
     public static final String TCP = "tcp";

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java?rev=619823&r1=619822&r2=619823&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java Fri Feb  8 02:09:37 2008
@@ -21,6 +21,7 @@
 package org.apache.qpid.jms;
 
 import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.ProtocolVersion;
 
 import java.util.List;
 
@@ -43,6 +44,7 @@
     public static final String OPTIONS_TEMPORARY_QUEUE_EXCHANGE = "temporaryQueueExchange";
     public static final byte  URL_0_8 = 1;
     public static final byte  URL_0_10 = 2;
+    public static final String OPTIONS_PROTOCOL_VERSION = "protocolVersion";
 
     byte getURLVersion();
 
@@ -91,4 +93,6 @@
     AMQShortString getTemporaryQueueExchangeName();
 
     AMQShortString getTemporaryTopicExchangeName();
+
+    ProtocolVersion getProtocolVersion();
 }

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/jms/failover/FailoverRoundRobinServers.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/jms/failover/FailoverRoundRobinServers.java?rev=619823&r1=619822&r2=619823&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/jms/failover/FailoverRoundRobinServers.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/jms/failover/FailoverRoundRobinServers.java Fri Feb  8 02:09:37 2008
@@ -22,7 +22,6 @@
 
 import org.apache.qpid.jms.BrokerDetails;
 import org.apache.qpid.jms.ConnectionURL;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -35,34 +34,22 @@
     /** The default number of times to retry each server */
     public static final int DEFAULT_SERVER_RETRIES = 0;
 
-    /**
-      * The index into the hostDetails array of the broker to which we are connected
-      */
+    /** The index into the hostDetails array of the broker to which we are connected */
     private int _currentBrokerIndex = -1;
 
-    /**
-     * The number of times to retry connecting for each server
-     */
+    /** The number of times to retry connecting for each server */
     private int _serverRetries;
 
-    /**
-     * The current number of retry attempts made
-     */
+    /** The current number of retry attempts made */
     private int _currentServerRetry;
 
-    /**
-     *  The number of times to cycle through the servers
-     */
+    /** The number of times to cycle through the servers */
     private int _cycleRetries;
 
-    /**
-     * The current number of cycles performed.
-     */
+    /** The current number of cycles performed. */
     private int _currentCycleRetries;
 
-    /**
-     * Array of BrokerDetail used to make connections.
-     */
+    /** Array of BrokerDetail used to make connections. */
     private ConnectionURL _connectionDetails;
 
     public FailoverRoundRobinServers(ConnectionURL connectionDetails)
@@ -128,6 +115,8 @@
 
     public BrokerDetails getNextBrokerDetails()
     {
+        boolean doDelay = false;
+
         if (_currentBrokerIndex == (_connectionDetails.getBrokerCount() - 1))
         {
             if (_currentServerRetry < _serverRetries)
@@ -143,6 +132,7 @@
                 else
                 {
                     _logger.info("Retrying " + _connectionDetails.getBrokerDetails(_currentBrokerIndex));
+                    doDelay=true;
                 }
 
                 _currentServerRetry++;
@@ -175,6 +165,7 @@
                 else
                 {
                     _logger.info("Retrying " + _connectionDetails.getBrokerDetails(_currentBrokerIndex));
+                    doDelay=true;
                 }
 
                 _currentServerRetry++;
@@ -189,7 +180,28 @@
             }
         }
 
-        return _connectionDetails.getBrokerDetails(_currentBrokerIndex);
+        BrokerDetails broker = _connectionDetails.getBrokerDetails(_currentBrokerIndex);
+
+        String delayStr = broker.getProperty(BrokerDetails.OPTIONS_CONNECT_DELAY);
+        if (delayStr != null && doDelay)
+        {
+            Long delay = Long.parseLong(delayStr);
+            _logger.info("Delay between connect retries:" + delay);
+            try
+            {
+                Thread.sleep(delay);
+            }
+            catch (InterruptedException ie)
+            {
+                return null;
+            }
+        }
+        else
+        {
+            _logger.info("No delay between connect retries, use tcp://host:port?connectdelay='value' to enable.");
+        }
+
+        return broker;
     }
 
     public void setBroker(BrokerDetails broker)

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/jms/failover/FailoverSingleServer.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/jms/failover/FailoverSingleServer.java?rev=619823&r1=619822&r2=619823&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/jms/failover/FailoverSingleServer.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/jms/failover/FailoverSingleServer.java Fri Feb  8 02:09:37 2008
@@ -22,25 +22,23 @@
 
 import org.apache.qpid.jms.BrokerDetails;
 import org.apache.qpid.jms.ConnectionURL;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class FailoverSingleServer implements FailoverMethod
 {
+    private static final Logger _logger = LoggerFactory.getLogger(FailoverSingleServer.class);
+
     /** The default number of times to rety a conection to this server */
     public static final int DEFAULT_SERVER_RETRIES = 1;
 
-    /**
-     * The details of the Single Server
-     */
+    /** The details of the Single Server */
     private BrokerDetails _brokerDetail;
 
-    /**
-     * The number of times to retry connecting to the sever
-     */
+    /** The number of times to retry connecting to the sever */
     private int _retries;
 
-    /**
-     * The current number of attempts made to the server
-     */
+    /** The current number of attempts made to the server */
     private int _currentRetries;
 
 
@@ -78,7 +76,7 @@
 
     public BrokerDetails getCurrentBrokerDetails()
     {
-       return _brokerDetail;
+        return _brokerDetail;
     }
 
     public BrokerDetails getNextBrokerDetails()
@@ -91,11 +89,29 @@
         {
             if (_currentRetries < _retries)
             {
-                _currentRetries ++;
+                _currentRetries++;
             }
+        }
+
 
-            return _brokerDetail;
+        String delayStr = _brokerDetail.getProperty(BrokerDetails.OPTIONS_CONNECT_DELAY);
+        if (delayStr != null && _currentRetries != 1)
+        {
+            Long delay = Long.parseLong(delayStr);
+            _logger.info("Delay between connect retries:" + delay);
+            try
+            {
+
+                Thread.sleep(delay);
+            }
+            catch (InterruptedException ie)
+            {
+                _logger.info("No delay between connect retries, use tcp://host:port?connectdelay='value' to enable.");
+                return null;
+            }
         }
+
+        return _brokerDetail;
     }
 
     public void setBroker(BrokerDetails broker)
@@ -138,10 +154,10 @@
 
     public String toString()
     {
-        return "SingleServer:\n"+
-                "Max Retries:"+_retries+
-                "\nCurrent Retry:"+_currentRetries+
-                "\n"+_brokerDetail+"\n";
+        return "SingleServer:\n" +
+               "Max Retries:" + _retries +
+               "\nCurrent Retry:" + _currentRetries +
+               "\n" + _brokerDetail + "\n";
     }
 
 }

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java?rev=619823&r1=619822&r2=619823&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java Fri Feb  8 02:09:37 2008
@@ -99,8 +99,6 @@
             _logger.warn("Unable to load property file specified in Provider_URL:" + environment.get(Context.PROVIDER_URL));
         }
 
-        
-
         createConnectionFactories(data, environment);
 
         createDestinations(data, environment);

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/client/AMQQueueDeferredOrderingTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/client/AMQQueueDeferredOrderingTest.java?rev=619823&r1=619822&r2=619823&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/client/AMQQueueDeferredOrderingTest.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/client/AMQQueueDeferredOrderingTest.java Fri Feb  8 02:09:37 2008
@@ -126,7 +126,7 @@
         _logger.info("Consuming messages");
         for (int i = 0; i < NUM_MESSAGES; i++)
         {
-            Message msg = consumer.receive(1500);
+            Message msg = consumer.receive(3000);
             assertNotNull("Message should not be null", msg);
             assertTrue("Message should be a text message", msg instanceof TextMessage);
             assertEquals("Message content does not match expected", Integer.toString(i), ((TextMessage) msg).getText());

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java?rev=619823&r1=619822&r2=619823&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/client/MessageListenerMultiConsumerTest.java Fri Feb  8 02:09:37 2008
@@ -20,13 +20,17 @@
  */
 package org.apache.qpid.client;
 
+import junit.framework.TestCase;
 
+import org.apache.qpid.client.transport.TransportConnection;
+import org.apache.qpid.jndi.PropertiesFileInitialContextFactory;
 import org.apache.qpid.testutil.QpidTestCase;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
 import javax.jms.Message;
 import javax.jms.MessageConsumer;
 import javax.jms.MessageListener;
@@ -34,7 +38,9 @@
 import javax.jms.Queue;
 import javax.jms.Session;
 import javax.naming.Context;
+import javax.naming.spi.InitialContextFactory;
 
+import java.util.Hashtable;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/client/MessageListenerTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/client/MessageListenerTest.java?rev=619823&r1=619822&r2=619823&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/client/MessageListenerTest.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/client/MessageListenerTest.java Fri Feb  8 02:09:37 2008
@@ -20,13 +20,17 @@
  */
 package org.apache.qpid.client;
 
+import junit.framework.TestCase;
 
+import org.apache.qpid.client.transport.TransportConnection;
+import org.apache.qpid.jndi.PropertiesFileInitialContextFactory;
 import org.apache.qpid.testutil.QpidTestCase;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
 import javax.jms.Message;
 import javax.jms.MessageConsumer;
 import javax.jms.MessageListener;
@@ -34,6 +38,9 @@
 import javax.jms.Queue;
 import javax.jms.Session;
 import javax.naming.Context;
+import javax.naming.spi.InitialContextFactory;
+
+import java.util.Hashtable;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/client/ResetMessageListenerTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/client/ResetMessageListenerTest.java?rev=619823&r1=619822&r2=619823&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/client/ResetMessageListenerTest.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/client/ResetMessageListenerTest.java Fri Feb  8 02:09:37 2008
@@ -29,7 +29,16 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.jms.*;
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
 import javax.naming.Context;
 import javax.naming.spi.InitialContextFactory;
 
@@ -65,12 +74,15 @@
     private final CountDownLatch _allFirstMessagesSent = new CountDownLatch(2); // all messages Sent Lock
     private final CountDownLatch _allSecondMessagesSent = new CountDownLatch(2); // all messages Sent Lock
     private final CountDownLatch _allFirstMessagesSent010 = new CountDownLatch(MSG_COUNT); // all messages Sent Lock
-       private final CountDownLatch _allSecondMessagesSent010 = new CountDownLatch(MSG_COUNT); // all messages Sent Lock
+	private final CountDownLatch _allSecondMessagesSent010 = new CountDownLatch(MSG_COUNT); // all messages Sent Lock
+    
+    private String oldImmediatePrefetch;
 
     protected void setUp() throws Exception
     {
         super.setUp();
 
+        oldImmediatePrefetch = System.getProperty(AMQSession.IMMEDIATE_PREFETCH);
         System.setProperty(AMQSession.IMMEDIATE_PREFETCH, "true");
 
         _clientConnection = getConnection("guest", "guest");
@@ -109,8 +121,12 @@
     {
         _clientConnection.close();
 
-        _producerConnection.close();
         super.tearDown();
+        if (oldImmediatePrefetch == null)
+        {
+            oldImmediatePrefetch = AMQSession.IMMEDIATE_PREFETCH_DEFAULT;
+        }
+        System.setProperty(AMQSession.IMMEDIATE_PREFETCH, oldImmediatePrefetch);
     }
 
     public void testAsynchronousRecieve()
@@ -238,7 +254,7 @@
 
             try
             {
-                _allSecondMessagesSent.await(1000, TimeUnit.MILLISECONDS);
+            _allSecondMessagesSent.await(5000, TimeUnit.MILLISECONDS);
             }
             catch (InterruptedException e)
             {

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseMethodHandlerNoCloseOk.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseMethodHandlerNoCloseOk.java?rev=619823&r1=619822&r2=619823&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseMethodHandlerNoCloseOk.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseMethodHandlerNoCloseOk.java Fri Feb  8 02:09:37 2008
@@ -37,7 +37,7 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class ChannelCloseMethodHandlerNoCloseOk implements StateAwareMethodListener
+public class ChannelCloseMethodHandlerNoCloseOk implements StateAwareMethodListener<ChannelCloseBody>
 {
     private static final Logger _logger = LoggerFactory.getLogger(ChannelCloseMethodHandlerNoCloseOk.class);
 
@@ -48,14 +48,15 @@
         return _handler;
     }
 
-    public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt)
+    public void methodReceived(AMQStateManager stateManager,  ChannelCloseBody method, int channelId)
         throws AMQException
     {
         _logger.debug("ChannelClose method received");
-        ChannelCloseBody method = (ChannelCloseBody) evt.getMethod();
+        final AMQProtocolSession session = stateManager.getProtocolSession();
 
-        AMQConstant errorCode = AMQConstant.getConstant(method.replyCode);
-        AMQShortString reason = method.replyText;
+
+        AMQConstant errorCode = AMQConstant.getConstant(method.getReplyCode());
+        AMQShortString reason = method.getReplyText();
         if (_logger.isDebugEnabled())
         {
             _logger.debug("Channel close reply code: " + errorCode + ", reason: " + reason);
@@ -95,6 +96,6 @@
 
         }
 
-        protocolSession.channelClosed(evt.getChannelId(), errorCode, String.valueOf(reason));
+        session.channelClosed(channelId, errorCode, String.valueOf(reason));
     }
 }

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseTest.java?rev=619823&r1=619822&r2=619823&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseTest.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseTest.java Fri Feb  8 02:09:37 2008
@@ -25,17 +25,13 @@
 import org.apache.qpid.AMQException;
 import org.apache.qpid.AMQTimeoutException;
 import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.handler.ClientMethodDispatcherImpl;
 import org.apache.qpid.client.failover.FailoverException;
+import org.apache.qpid.client.protocol.AMQProtocolHandler;
 import org.apache.qpid.client.protocol.AMQProtocolSession;
 import org.apache.qpid.client.state.AMQStateManager;
 import org.apache.qpid.client.transport.TransportConnection;
-import org.apache.qpid.framing.AMQFrame;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.ChannelCloseOkBody;
-import org.apache.qpid.framing.ChannelOpenBody;
-import org.apache.qpid.framing.ChannelOpenOkBody;
-import org.apache.qpid.framing.ExchangeDeclareBody;
-import org.apache.qpid.framing.ExchangeDeclareOkBody;
+import org.apache.qpid.framing.*;
 import org.apache.qpid.jms.ConnectionListener;
 import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.url.URLSyntaxException;
@@ -52,6 +48,9 @@
 import javax.jms.Queue;
 import javax.jms.Session;
 import javax.jms.TextMessage;
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
 
 public class ChannelCloseTest extends TestCase implements ExceptionListener, ConnectionListener
 {
@@ -135,8 +134,11 @@
 
     /*
     close channel and send guff then send ok no errors
+    REMOVE TEST - The behaviour after server has sent close is undefined.
+    the server should be free to fail as it may wish to reclaim its resources
+    immediately after close.
      */
-    public void testSendingMethodsAfterClose() throws Exception
+    /*public void testSendingMethodsAfterClose() throws Exception
     {
         try
         {
@@ -158,6 +160,17 @@
 
             // Set StateManager to manager that ignores Close-oks
             AMQProtocolSession protocolSession = ((AMQConnection) _connection).getProtocolHandler().getProtocolSession();
+
+            MethodDispatcher d = protocolSession.getMethodDispatcher();
+
+            MethodDispatcher wrappedDispatcher = (MethodDispatcher)
+                    Proxy.newProxyInstance(d.getClass().getClassLoader(),
+                                           d.getClass().getInterfaces(),
+                                           new MethodDispatcherProxyHandler(
+                                                   (ClientMethodDispatcherImpl) d));
+
+            protocolSession.setMethodDispatcher(wrappedDispatcher);
+
             AMQStateManager newStateManager = new NoCloseOKStateManager(protocolSession);
             newStateManager.changeState(oldStateManager.getCurrentState());
 
@@ -247,7 +260,7 @@
             }
         }
     }
-
+*/
     private void createChannelAndTest(int channel) throws FailoverException
     {
         // Create A channel
@@ -274,10 +287,9 @@
 
     private void sendClose(int channel)
     {
-        AMQFrame frame =
-            ChannelCloseOkBody.createAMQFrame(channel,
-                ((AMQConnection) _connection).getProtocolHandler().getProtocolMajorVersion(),
-                ((AMQConnection) _connection).getProtocolHandler().getProtocolMinorVersion());
+        ChannelCloseOkBody body =
+                ((AMQConnection) _connection).getProtocolHandler().getMethodRegistry().createChannelCloseOkBody();
+        AMQFrame frame = body.generateFrame(channel);
 
         ((AMQConnection) _connection).getProtocolHandler().writeFrame(frame);
     }
@@ -335,35 +347,43 @@
     private void declareExchange(int channelId, String _type, String _name, boolean nowait)
         throws AMQException, FailoverException
     {
-        AMQFrame exchangeDeclare =
-            ExchangeDeclareBody.createAMQFrame(channelId,
-                ((AMQConnection) _connection).getProtocolHandler().getProtocolMajorVersion(),
-                ((AMQConnection) _connection).getProtocolHandler().getProtocolMinorVersion(), null, // arguments
-                false, // autoDelete
-                false, // durable
-                new AMQShortString(_name), // exchange
-                false, // internal
-                nowait, // nowait
-                true, // passive
-                0, // ticket
-                new AMQShortString(_type)); // type
+        ExchangeDeclareBody body =
+                ((AMQConnection) _connection).getProtocolHandler()
+                        .getMethodRegistry()
+                        .createExchangeDeclareBody(0,
+                                                   new AMQShortString(_name),
+                                                   new AMQShortString(_type),
+                                                   true,
+                                                   false,
+                                                   false,
+                                                   false,
+                                                   nowait,
+                                                   null);
+                AMQFrame exchangeDeclare = body.generateFrame(channelId);
+                AMQProtocolHandler protocolHandler = ((AMQConnection) _connection).getProtocolHandler();
+
+
+                if (nowait)
+                {
+                    protocolHandler.writeFrame(exchangeDeclare);
+                }
+                else
+                {
+                    protocolHandler.syncWrite(exchangeDeclare, ExchangeDeclareOkBody.class, SYNC_TIMEOUT);
+                }
+
+//                return null;
+//            }
+//        }, (AMQConnection)_connection).execute();
 
-        if (nowait)
-        {
-            ((AMQConnection) _connection).getProtocolHandler().writeFrame(exchangeDeclare);
-        }
-        else
-        {
-            ((AMQConnection) _connection).getProtocolHandler().syncWrite(exchangeDeclare, ExchangeDeclareOkBody.class,
-                SYNC_TIMEOUT);
-        }
     }
 
     private void createChannel(int channelId) throws AMQException, FailoverException
     {
-        ((AMQConnection) _connection).getProtocolHandler().syncWrite(ChannelOpenBody.createAMQFrame(channelId,
-                ((AMQConnection) _connection).getProtocolHandler().getProtocolMajorVersion(),
-                ((AMQConnection) _connection).getProtocolHandler().getProtocolMinorVersion(), null), // outOfBand
+        ChannelOpenBody body =
+                ((AMQConnection) _connection).getProtocolHandler().getMethodRegistry().createChannelOpenBody(null);
+
+        ((AMQConnection) _connection).getProtocolHandler().syncWrite(body.generateFrame(channelId), // outOfBand
             ChannelOpenOkBody.class);
 
     }
@@ -392,4 +412,28 @@
 
     public void failoverComplete()
     { }
+
+    private static final class MethodDispatcherProxyHandler implements InvocationHandler
+    {
+        private final ClientMethodDispatcherImpl _underlyingDispatcher;
+        private final ChannelCloseMethodHandlerNoCloseOk _handler = ChannelCloseMethodHandlerNoCloseOk.getInstance();
+
+
+        public MethodDispatcherProxyHandler(ClientMethodDispatcherImpl dispatcher)
+        {
+            _underlyingDispatcher = dispatcher;
+        }
+
+        public Object invoke(Object proxy, Method method, Object[] args) throws Throwable
+        {
+            if(method.getName().equals("dispatchChannelClose"))
+            {
+                _handler.methodReceived(_underlyingDispatcher.getStateManager(),
+                                        (ChannelCloseBody) args[0], (Integer)args[1]);
+            }
+            Method dispatcherMethod = _underlyingDispatcher.getClass().getMethod(method.getName(), method.getParameterTypes());
+            return dispatcherMethod.invoke(_underlyingDispatcher, args);
+
+        }
+    }
 }

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/NoCloseOKStateManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/NoCloseOKStateManager.java?rev=619823&r1=619822&r2=619823&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/NoCloseOKStateManager.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/NoCloseOKStateManager.java Fri Feb  8 02:09:37 2008
@@ -59,49 +59,7 @@
         super(protocolSession);
     }
 
-    protected void registerListeners()
-    {
-        Map frame2handlerMap = new HashMap();
-
-        // we need to register a map for the null (i.e. all state) handlers otherwise you get
-        // a stack overflow in the handler searching code when you present it with a frame for which
-        // no handlers are registered
-        //
-        _state2HandlersMap.put(null, frame2handlerMap);
-
-        frame2handlerMap = new HashMap();
-        frame2handlerMap.put(ConnectionStartBody.class, ConnectionStartMethodHandler.getInstance());
-        frame2handlerMap.put(ConnectionCloseBody.class, ConnectionCloseMethodHandler.getInstance());
-        _state2HandlersMap.put(AMQState.CONNECTION_NOT_STARTED, frame2handlerMap);
-
-        frame2handlerMap = new HashMap();
-        frame2handlerMap.put(ConnectionTuneBody.class, ConnectionTuneMethodHandler.getInstance());
-        frame2handlerMap.put(ConnectionSecureBody.class, ConnectionSecureMethodHandler.getInstance());
-        frame2handlerMap.put(ConnectionCloseBody.class, ConnectionCloseMethodHandler.getInstance());
-        _state2HandlersMap.put(AMQState.CONNECTION_NOT_TUNED, frame2handlerMap);
-
-        frame2handlerMap = new HashMap();
-        frame2handlerMap.put(ConnectionOpenOkBody.class, ConnectionOpenOkMethodHandler.getInstance());
-        frame2handlerMap.put(ConnectionCloseBody.class, ConnectionCloseMethodHandler.getInstance());
-        _state2HandlersMap.put(AMQState.CONNECTION_NOT_OPENED, frame2handlerMap);
-
-        //
-        // ConnectionOpen handlers
-        //
-        frame2handlerMap = new HashMap();
-        // Use Test Handler for Close methods to not send Close-OKs
-        frame2handlerMap.put(ChannelCloseBody.class, ChannelCloseMethodHandlerNoCloseOk.getInstance());
-
-        frame2handlerMap.put(ChannelCloseOkBody.class, ChannelCloseOkMethodHandler.getInstance());
-        frame2handlerMap.put(ConnectionCloseBody.class, ConnectionCloseMethodHandler.getInstance());
-        frame2handlerMap.put(BasicDeliverBody.class, BasicDeliverMethodHandler.getInstance());
-        frame2handlerMap.put(BasicReturnBody.class, BasicReturnMethodHandler.getInstance());
-        frame2handlerMap.put(BasicCancelOkBody.class, BasicCancelOkMethodHandler.getInstance());
-        frame2handlerMap.put(ChannelFlowOkBody.class, ChannelFlowOkMethodHandler.getInstance());
-        frame2handlerMap.put(QueueDeleteOkBody.class, QueueDeleteOkMethodHandler.getInstance());
-        frame2handlerMap.put(ExchangeBoundOkBody.class, ExchangeBoundOkMethodHandler.getInstance());
-        _state2HandlersMap.put(AMQState.CONNECTION_OPEN, frame2handlerMap);
-    }
+    
 
 
 }

Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java?rev=619823&r1=619822&r2=619823&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java Fri Feb  8 02:09:37 2008
@@ -41,6 +41,14 @@
 import javax.jms.TextMessage;
 import javax.jms.TopicSubscriber;
 
+/**
+ * @todo Code to check that a consumer gets only one particular method could be factored into a re-usable method (as
+ *       a static on a base test helper class, e.g. TestUtils.
+ *
+ * @todo Code to create test end-points using session per connection, or all sessions on one connection, to be factored
+ *       out to make creating this test variation simpler. Want to make this variation available through LocalCircuit,
+ *       driven by the test model.
+ */
 public class DurableSubscriptionTest extends QpidTestCase
 {
     private static final Logger _logger = LoggerFactory.getLogger(DurableSubscriptionTest.class);
@@ -113,12 +121,26 @@
         con.close();
     }
 
-    public void testDurability() throws Exception
+    public void testDurabilityAUTOACK() throws AMQException, JMSException, URLSyntaxException
     {
+        durabilityImpl(Session.AUTO_ACKNOWLEDGE);
+    }
 
-        AMQConnection con = (AMQConnection) getConnection("guest", "guest");
-        AMQTopic topic = new AMQTopic(con, "MyDurableSubscriptionTestTopic");
-        Session session1 = con.createSession(false, AMQSession.NO_ACKNOWLEDGE);
+    public void testDurabilityNOACKSessionPerConnection() throws AMQException, JMSException, URLSyntaxException
+    {
+        durabilityImplSessionPerConnection(AMQSession.NO_ACKNOWLEDGE);
+    }
+
+    public void testDurabilityAUTOACKSessionPerConnection() throws AMQException, JMSException, URLSyntaxException
+    {
+        durabilityImplSessionPerConnection(Session.AUTO_ACKNOWLEDGE);
+    }
+
+    private void durabilityImpl(int ackMode) throws AMQException, JMSException, URLSyntaxException
+    {
+        AMQConnection con = new AMQConnection("vm://:1", "guest", "guest", "test", "test");
+        AMQTopic topic = new AMQTopic(con, "MyTopic");
+        Session session1 = con.createSession(false, ackMode);
         MessageConsumer consumer1 = session1.createConsumer(topic);
 
         Session sessionProd = con.createSession(false, AMQSession.NO_ACKNOWLEDGE);
@@ -144,10 +166,83 @@
 
         consumer2.close();
 
-        Session session3 = con.createSession(false, AMQSession.NO_ACKNOWLEDGE);
+        producer.send(session1.createTextMessage("B"));
+
+        _logger.info("Receive message on consumer 1 :expecting B");
+        msg = consumer1.receive(500);
+        assertNotNull("Consumer 1 should get message 'B'.", msg);
+        assertEquals("Incorrect Message recevied on consumer1.", "B", ((TextMessage) msg).getText());
+        _logger.info("Receive message on consumer 1 :expecting null");
+        msg = consumer1.receive(500);
+        assertNull("There should be no more messages for consumption on consumer1.", msg);
+
+        Session session3 = con.createSession(false, ackMode);
         MessageConsumer consumer3 = session3.createDurableSubscriber(topic, "MySubscription");
 
-        producer.send(session1.createTextMessage("B"));
+        _logger.info("Receive message on consumer 3 :expecting B");
+        msg = consumer3.receive(500);
+        assertNotNull("Consumer 3 should get message 'B'.", msg);
+        assertEquals("Incorrect Message recevied on consumer4.", "B", ((TextMessage) msg).getText());
+        _logger.info("Receive message on consumer 3 :expecting null");
+        msg = consumer3.receive(500);
+        assertNull("There should be no more messages for consumption on consumer3.", msg);
+
+        consumer1.close();
+        consumer3.close();
+
+        con.close();
+    }
+
+    private void durabilityImplSessionPerConnection(int ackMode) throws AMQException, JMSException, URLSyntaxException
+    {
+        Message msg;
+
+        // Create producer.
+        AMQConnection con0 = new AMQConnection("vm://:1", "guest", "guest", "test", "test");
+        con0.start();
+        Session session0 = con0.createSession(false, ackMode);
+
+        AMQTopic topic = new AMQTopic(con0, "MyTopic");
+
+        Session sessionProd = con0.createSession(false, ackMode);
+        MessageProducer producer = sessionProd.createProducer(topic);
+
+        // Create consumer 1.
+        AMQConnection con1 = new AMQConnection("vm://:1", "guest", "guest", "test", "test");
+        con1.start();
+        Session session1 = con1.createSession(false, ackMode);
+
+        MessageConsumer consumer1 = session0.createConsumer(topic);
+
+        // Create consumer 2.
+        AMQConnection con2 = new AMQConnection("vm://:1", "guest", "guest", "test", "test");
+        con2.start();
+        Session session2 = con2.createSession(false, ackMode);
+
+        TopicSubscriber consumer2 = session2.createDurableSubscriber(topic, "MySubscription");
+
+        // Send message and check that both consumers get it and only it.
+        producer.send(session0.createTextMessage("A"));
+
+        msg = consumer1.receive(500);
+        assertNotNull("Message should be available", msg);
+        assertEquals("Message Text doesn't match", "A", ((TextMessage) msg).getText());
+        msg = consumer1.receive(500);
+        assertNull("There should be no more messages for consumption on consumer1.", msg);
+
+        msg = consumer2.receive();
+        assertNotNull(msg);
+        assertEquals("Consumer 2 should also received the first msg.", "A", ((TextMessage) msg).getText());
+        msg = consumer2.receive(500);
+        assertNull("There should be no more messages for consumption on consumer2.", msg);
+
+        // Detach the durable subscriber.
+        consumer2.close();
+        session2.close();
+        con2.close();
+
+        // Send message and receive on open consumer.
+        producer.send(session0.createTextMessage("B"));
 
         _logger.info("Receive message on consumer 1 :expecting B");
         msg = consumer1.receive(100);
@@ -156,14 +251,26 @@
         msg = consumer1.receive(100);
         assertEquals(null, msg);
 
+        // Re-attach a new consumer to the durable subscription, and check that it gets the message that it missed.
+        AMQConnection con3 = new AMQConnection("vm://:1", "guest", "guest", "test", "test");
+        con3.start();
+        Session session3 = con3.createSession(false, ackMode);
+
+        TopicSubscriber consumer3 = session3.createDurableSubscriber(topic, "MySubscription");
+
         _logger.info("Receive message on consumer 3 :expecting B");
         msg = consumer3.receive(100);
         assertEquals("B", ((TextMessage) msg).getText());
         _logger.info("Receive message on consumer 3 :expecting null");
-        msg = consumer3.receive(100);
-        assertEquals(null, msg);
+        msg = consumer3.receive(500);
+        assertNull("There should be no more messages for consumption on consumer3.", msg);
 
-        con.close();
+        consumer1.close();
+        consumer3.close();
+
+        con0.close();
+        con1.close();
+        con3.close();
     }
 
     public static junit.framework.Test suite()