You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2007/11/26 02:41:32 UTC

svn commit: r598105 - in /incubator/qpid/branches/M2.1: gentools/src/org/apache/qpid/gentools/ java/broker/src/main/java/org/apache/qpid/server/exchange/ java/broker/src/main/java/org/apache/qpid/server/handler/ java/client/src/main/java/org/apache/qpi...

Author: rgodfrey
Date: Sun Nov 25 17:41:31 2007
New Revision: 598105

URL: http://svn.apache.org/viewvc?rev=598105&view=rev
Log:
QPID-567 : Add mutliversion support to Qpid/Java, fixed client support when server returns Protocol header.
           Added QueueUnbind
           Added ability to select protocol version in ConnectionURL or with -Dorg.apache.qpid.amqp_version
           

Added:
    incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/QueueUnbindHandler.java
Modified:
    incubator/qpid/branches/M2.1/gentools/src/org/apache/qpid/gentools/Generator.java
    incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java
    incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java
    incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java
    incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
    incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/ServerMethodDispatcherImpl_0_9.java
    incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
    incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/AMQConnectionURL.java
    incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
    incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
    incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/state/AMQState.java
    incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java
    incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java
    incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java
    incubator/qpid/branches/M2.1/java/common/templates/model/ProtocolVersionListClass.vm

Modified: incubator/qpid/branches/M2.1/gentools/src/org/apache/qpid/gentools/Generator.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/gentools/src/org/apache/qpid/gentools/Generator.java?rev=598105&r1=598104&r2=598105&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/gentools/src/org/apache/qpid/gentools/Generator.java (original)
+++ incubator/qpid/branches/M2.1/gentools/src/org/apache/qpid/gentools/Generator.java Sun Nov 25 17:41:31 2007
@@ -479,7 +479,11 @@
             Template velocityTemplate = Velocity.getTemplate(template.getName());
             velocityTemplate.merge(context, sw);
             String filename = String.valueOf(context.get("filename"));
-            FileWriter outputFileWriter = new FileWriter(getOutputDirectory() + Utils.FILE_SEPARATOR + filename);
+
+            File outputFile = new File(getOutputDirectory() + Utils.FILE_SEPARATOR + filename);
+            outputFile.getParentFile().mkdirs();
+            FileWriter outputFileWriter = new FileWriter(outputFile);
+
             outputFileWriter.append(sw.toString());
             outputFileWriter.close();
 

Modified: incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java?rev=598105&r1=598104&r2=598105&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java (original)
+++ incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java Sun Nov 25 17:41:31 2007
@@ -34,6 +34,7 @@
 
 import org.apache.log4j.Logger;
 import org.apache.qpid.AMQException;
+import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.exchange.ExchangeDefaults;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.FieldTable;
@@ -150,7 +151,7 @@
 
         if (!_index.remove(routingKey, queue))
         {
-            throw new AMQException("Queue " + queue + " was not registered with exchange " + this.getName() +
+            throw new AMQException(AMQConstant.NOT_FOUND, "Queue " + queue + " was not registered with exchange " + this.getName() +
                                    " with routing key " + routingKey + ". No queue was registered with that _routing key");
         }
     }

Modified: incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java?rev=598105&r1=598104&r2=598105&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java (original)
+++ incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java Sun Nov 25 17:41:31 2007
@@ -22,6 +22,7 @@
 
 import org.apache.log4j.Logger;
 import org.apache.qpid.AMQException;
+import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.exchange.ExchangeDefaults;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.FieldTable;
@@ -277,7 +278,7 @@
         List<AMQQueue> queues = _routingKey2queues.get(routingKey);
         if (queues == null)
         {
-            throw new AMQException("Queue " + queue + " was not registered with exchange " + this.getName()
+            throw new AMQException(AMQConstant.NOT_FOUND, "Queue " + queue + " was not registered with exchange " + this.getName()
                                    + " with routing key " + routingKey + ". No queue was registered with that _routing key");
 
         }
@@ -285,7 +286,7 @@
         boolean removedQ = queues.remove(queue);
         if (!removedQ)
         {
-            throw new AMQException("Queue " + queue + " was not registered with exchange " + this.getName()
+            throw new AMQException(AMQConstant.NOT_FOUND, "Queue " + queue + " was not registered with exchange " + this.getName()
                                    + " with routing key " + routingKey);
         }
 

Modified: incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java?rev=598105&r1=598104&r2=598105&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java (original)
+++ incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java Sun Nov 25 17:41:31 2007
@@ -22,6 +22,7 @@
 
 import org.apache.log4j.Logger;
 import org.apache.qpid.AMQException;
+import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.exchange.ExchangeDefaults;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.FieldTable;
@@ -146,7 +147,7 @@
 
         if (!_queues.remove(queue))
         {
-            throw new AMQException("Queue " + queue + " was not registered with exchange " + this.getName() + ". ");
+            throw new AMQException(AMQConstant.NOT_FOUND, "Queue " + queue + " was not registered with exchange " + this.getName() + ". ");
         }
     }
 

Modified: incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java?rev=598105&r1=598104&r2=598105&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java (original)
+++ incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java Sun Nov 25 17:41:31 2007
@@ -22,6 +22,7 @@
 
 import org.apache.log4j.Logger;
 import org.apache.qpid.AMQException;
+import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.exchange.ExchangeDefaults;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.AMQTypedValue;
@@ -200,7 +201,11 @@
     public void deregisterQueue(AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException
     {
         _logger.debug("Exchange " + getName() + ": Unbinding " + queue.getName());
-        _bindings.remove(new Registration(new HeadersBinding(args), queue));
+        if(!_bindings.remove(new Registration(new HeadersBinding(args), queue)))
+        {
+            throw new AMQException(AMQConstant.NOT_FOUND, "Queue " + queue + " was not registered with exchange " + this.getName()
+                                   + " with headers args " + args);    
+        }
     }
 
     public void route(AMQMessage payload) throws AMQException

Added: incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/QueueUnbindHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/QueueUnbindHandler.java?rev=598105&view=auto
==============================================================================
--- incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/QueueUnbindHandler.java (added)
+++ incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/QueueUnbindHandler.java Sun Nov 25 17:41:31 2007
@@ -0,0 +1,110 @@
+package org.apache.qpid.server.handler;
+
+import org.apache.log4j.Logger;
+
+import org.apache.qpid.framing.*;
+import org.apache.qpid.framing.amqp_0_9.MethodRegistry_0_9;
+import org.apache.qpid.server.state.StateAwareMethodListener;
+import org.apache.qpid.server.state.AMQStateManager;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQInvalidRoutingKeyException;
+import org.apache.qpid.protocol.AMQConstant;
+
+public class QueueUnbindHandler implements StateAwareMethodListener<QueueUnbindBody>
+{
+    private static final Logger _log = Logger.getLogger(QueueUnbindHandler.class);
+
+    private static final QueueUnbindHandler _instance = new QueueUnbindHandler();
+
+    public static QueueUnbindHandler getInstance()
+    {
+        return _instance;
+    }
+
+    private QueueUnbindHandler()
+    {
+    }
+
+    public void methodReceived(AMQStateManager stateManager, QueueUnbindBody body, int channelId) throws AMQException
+    {
+        AMQProtocolSession session = stateManager.getProtocolSession();
+        VirtualHost virtualHost = session.getVirtualHost();
+        ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry();
+        QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
+
+
+        final AMQQueue queue;
+        final AMQShortString routingKey;
+
+        if (body.getQueue() == null)
+        {
+            AMQChannel channel = session.getChannel(channelId);
+
+            if (channel == null)
+            {
+                throw body.getChannelNotFoundException(channelId);
+            }
+
+            queue = channel.getDefaultQueue();
+
+            if (queue == null)
+            {
+                throw body.getConnectionException(AMQConstant.NOT_FOUND, "No default queue defined on channel and queue was null");
+            }
+
+            routingKey = body.getRoutingKey() == null ? null : body.getRoutingKey().intern();
+
+        }
+        else
+        {
+            queue = queueRegistry.getQueue(body.getQueue());
+            routingKey = body.getRoutingKey() == null ? null : body.getRoutingKey().intern();
+        }
+
+        if (queue == null)
+        {
+            throw body.getConnectionException(AMQConstant.NOT_FOUND, "Queue " + body.getQueue() + " does not exist.");
+        }
+        final Exchange exch = exchangeRegistry.getExchange(body.getExchange());
+        if (exch == null)
+        {
+            throw body.getChannelException(AMQConstant.NOT_FOUND, "Exchange " + body.getExchange() + " does not exist.");
+        }
+
+
+        try
+        {
+            queue.unBind(routingKey, body.getArguments(), exch);
+        }
+        catch (AMQInvalidRoutingKeyException rke)
+        {
+            throw body.getChannelException(AMQConstant.INVALID_ROUTING_KEY, routingKey.toString());
+        }
+        catch (AMQException e)
+        {
+            if(e.getErrorCode() == AMQConstant.NOT_FOUND)
+            {
+                throw body.getConnectionException(AMQConstant.NOT_FOUND,e.getMessage(),e);
+            }
+            throw body.getChannelException(AMQConstant.CHANNEL_ERROR, e.toString());
+        }
+
+        if (_log.isInfoEnabled())
+        {
+            _log.info("Binding queue " + queue + " to exchange " + exch + " with routing key " + routingKey);
+        }
+
+        MethodRegistry_0_9 methodRegistry = (MethodRegistry_0_9) session.getMethodRegistry();
+        AMQMethodBody responseBody = methodRegistry.createQueueUnbindOkBody();
+        session.writeFrame(responseBody.generateFrame(channelId));
+
+
+    }
+}

Modified: incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/ServerMethodDispatcherImpl_0_9.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/ServerMethodDispatcherImpl_0_9.java?rev=598105&r1=598104&r2=598105&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/ServerMethodDispatcherImpl_0_9.java (original)
+++ incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/ServerMethodDispatcherImpl_0_9.java Sun Nov 25 17:41:31 2007
@@ -36,6 +36,9 @@
 
     private static final BasicRecoverSyncMethodHandler _basicRecoverSyncMethodHandler =
             BasicRecoverSyncMethodHandler.getInstance();
+    private static final QueueUnbindHandler _queueUnbindHandler =
+            QueueUnbindHandler.getInstance();
+
 
     public ServerMethodDispatcherImpl_0_9(AMQStateManager stateManager)
     {
@@ -155,6 +158,7 @@
 
     public boolean dispatchQueueUnbind(QueueUnbindBody body, int channelId) throws AMQException
     {
-        return false;
+        _queueUnbindHandler.methodReceived(getStateManager(),body,channelId);
+        return true;
     }
 }

Modified: incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java?rev=598105&r1=598104&r2=598105&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java (original)
+++ incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java Sun Nov 25 17:41:31 2007
@@ -32,13 +32,7 @@
 import org.apache.qpid.client.state.AMQStateManager;
 import org.apache.qpid.client.transport.TransportConnection;
 import org.apache.qpid.exchange.ExchangeDefaults;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.BasicQosBody;
-import org.apache.qpid.framing.BasicQosOkBody;
-import org.apache.qpid.framing.ChannelOpenBody;
-import org.apache.qpid.framing.ChannelOpenOkBody;
-import org.apache.qpid.framing.TxSelectBody;
-import org.apache.qpid.framing.TxSelectOkBody;
+import org.apache.qpid.framing.*;
 import org.apache.qpid.jms.BrokerDetails;
 import org.apache.qpid.jms.ChannelLimitReachedException;
 import org.apache.qpid.jms.Connection;
@@ -161,6 +155,7 @@
     /** Thread Pool for executing connection level processes. Such as returning bounced messages. */
     private final ExecutorService _taskPool = Executors.newCachedThreadPool();
     private static final long DEFAULT_TIMEOUT = 1000 * 30;
+    private ProtocolVersion _protocolVersion;
 
     /**
      * @param broker      brokerdetails
@@ -253,6 +248,9 @@
         _clientName = connectionURL.getClientName();
         _username = connectionURL.getUsername();
         _password = connectionURL.getPassword();
+
+        _protocolVersion = connectionURL.getProtocolVersion();
+
         setVirtualHost(connectionURL.getVirtualHost());
 
         if (connectionURL.getDefaultQueueExchangeName() != null)
@@ -393,16 +391,24 @@
 
     private void makeBrokerConnection(BrokerDetails brokerDetail) throws IOException, AMQException
     {
+        final Set<AMQState> openOrClosedStates =
+                EnumSet.of(AMQState.CONNECTION_OPEN, AMQState.CONNECTION_CLOSED);
         try
         {
             TransportConnection.getInstance(brokerDetail).connect(_protocolHandler, brokerDetail);
             // this blocks until the connection has been set up or when an error
             // has prevented the connection being set up
-            _protocolHandler.attainState(AMQState.CONNECTION_OPEN);
-            _failoverPolicy.attainedConnection();
 
-            // Again this should be changed to a suitable notify
-            _connected = true;
+            //_protocolHandler.attainState(AMQState.CONNECTION_OPEN);
+            AMQState state = _protocolHandler.attainState(openOrClosedStates);
+            if(state == AMQState.CONNECTION_OPEN)
+            {
+
+                _failoverPolicy.attainedConnection();
+
+                // Again this should be changed to a suitable notify
+                _connected = true;
+            }
         }
         catch (AMQException e)
         {
@@ -1285,4 +1291,16 @@
     {
         return _sessions.get(channelId);
     }
+
+    public ProtocolVersion getProtocolVersion()
+    {
+        return _protocolVersion;
+    }
+
+    public void setProtocolVersion(ProtocolVersion protocolVersion)
+    {
+        _protocolVersion = protocolVersion;
+        _protocolHandler.getProtocolSession().setProtocolVersion(protocolVersion);
+    }
+
 }

Modified: incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/AMQConnectionURL.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/AMQConnectionURL.java?rev=598105&r1=598104&r2=598105&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/AMQConnectionURL.java (original)
+++ incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/AMQConnectionURL.java Sun Nov 25 17:41:31 2007
@@ -21,6 +21,7 @@
 package org.apache.qpid.client;
 
 import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.ProtocolVersion;
 import org.apache.qpid.jms.BrokerDetails;
 import org.apache.qpid.jms.ConnectionURL;
 import org.apache.qpid.url.URLHelper;
@@ -52,6 +53,7 @@
     private AMQShortString _defaultTopicExchangeName;
     private AMQShortString _temporaryTopicExchangeName;
     private AMQShortString _temporaryQueueExchangeName;
+    private ProtocolVersion _protocolVersion = ProtocolVersion.defaultProtocolVersion();
 
     public AMQConnectionURL(String fullURL) throws URLSyntaxException
     {
@@ -255,6 +257,15 @@
         {
             _temporaryTopicExchangeName = new AMQShortString(_options.get(OPTIONS_TEMPORARY_TOPIC_EXCHANGE));
         }
+        if(_options.containsKey(OPTIONS_PROTOCOL_VERSION))
+        {
+            ProtocolVersion pv = ProtocolVersion.parse(_options.get(OPTIONS_PROTOCOL_VERSION));
+            if(pv != null)
+            {
+                _protocolVersion = pv;
+            }
+        }
+
     }
 
     public String getURL()
@@ -375,6 +386,11 @@
     public AMQShortString getTemporaryTopicExchangeName()
     {
         return _temporaryTopicExchangeName;
+    }
+
+    public ProtocolVersion getProtocolVersion()
+    {
+        return _protocolVersion;
     }
 
     public String toString()

Modified: incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java?rev=598105&r1=598104&r2=598105&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java (original)
+++ incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java Sun Nov 25 17:41:31 2007
@@ -52,6 +52,7 @@
 import org.slf4j.LoggerFactory;
 
 import java.util.Iterator;
+import java.util.Set;
 import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.CountDownLatch;
 
@@ -387,94 +388,109 @@
 
     public void messageReceived(IoSession session, Object message) throws Exception
     {
-        final boolean debug = _logger.isDebugEnabled();
-        final long msgNumber = ++_messageReceivedCount;
-
-        if (debug && ((msgNumber % 1000) == 0))
+        if(message instanceof AMQFrame)
         {
-            _logger.debug("Received " + _messageReceivedCount + " protocol messages");
-        }
+            final boolean debug = _logger.isDebugEnabled();
+            final long msgNumber = ++_messageReceivedCount;
 
-        AMQFrame frame = (AMQFrame) message;
+            if (debug && ((msgNumber % 1000) == 0))
+            {
+                _logger.debug("Received " + _messageReceivedCount + " protocol messages");
+            }
 
-        final AMQBody bodyFrame = frame.getBodyFrame();
+            AMQFrame frame = (AMQFrame) message;
 
-        HeartbeatDiagnostics.received(bodyFrame instanceof HeartbeatBody);
+            final AMQBody bodyFrame = frame.getBodyFrame();
 
-        switch (bodyFrame.getFrameType())
-        {
-            case AMQMethodBody.TYPE:
+            HeartbeatDiagnostics.received(bodyFrame instanceof HeartbeatBody);
 
-                if (debug)
-                {
-                    _logger.debug("(" + System.identityHashCode(this) + ")Method frame received: " + frame);
-                }
+            switch (bodyFrame.getFrameType())
+            {
+                case AMQMethodBody.TYPE:
 
-                final AMQMethodEvent<AMQMethodBody> evt =
-                        new AMQMethodEvent<AMQMethodBody>(frame.getChannel(), (AMQMethodBody) bodyFrame);
+                    if (debug)
+                    {
+                        _logger.debug("(" + System.identityHashCode(this) + ")Method frame received: " + frame);
+                    }
 
-                try
-                {
+                    final AMQMethodEvent<AMQMethodBody> evt =
+                            new AMQMethodEvent<AMQMethodBody>(frame.getChannel(), (AMQMethodBody) bodyFrame);
 
-                    boolean wasAnyoneInterested = getStateManager().methodReceived(evt);
-                    if (!_frameListeners.isEmpty())
+                    try
                     {
-                        Iterator it = _frameListeners.iterator();
-                        while (it.hasNext())
+
+                        boolean wasAnyoneInterested = getStateManager().methodReceived(evt);
+                        if (!_frameListeners.isEmpty())
                         {
-                            final AMQMethodListener listener = (AMQMethodListener) it.next();
-                            wasAnyoneInterested = listener.methodReceived(evt) || wasAnyoneInterested;
+                            Iterator it = _frameListeners.iterator();
+                            while (it.hasNext())
+                            {
+                                final AMQMethodListener listener = (AMQMethodListener) it.next();
+                                wasAnyoneInterested = listener.methodReceived(evt) || wasAnyoneInterested;
+                            }
                         }
-                    }
 
-                    if (!wasAnyoneInterested)
-                    {
-                        throw new AMQException("AMQMethodEvent " + evt + " was not processed by any listener.  Listeners:"
-                                               + _frameListeners);
+                        if (!wasAnyoneInterested)
+                        {
+                            throw new AMQException("AMQMethodEvent " + evt + " was not processed by any listener.  Listeners:"
+                                                   + _frameListeners);
+                        }
                     }
-                }
-                catch (AMQException e)
-                {
-                    getStateManager().error(e);
-                    if (!_frameListeners.isEmpty())
+                    catch (AMQException e)
                     {
-                        Iterator it = _frameListeners.iterator();
-                        while (it.hasNext())
+                        getStateManager().error(e);
+                        if (!_frameListeners.isEmpty())
                         {
-                            final AMQMethodListener listener = (AMQMethodListener) it.next();
-                            listener.error(e);
+                            Iterator it = _frameListeners.iterator();
+                            while (it.hasNext())
+                            {
+                                final AMQMethodListener listener = (AMQMethodListener) it.next();
+                                listener.error(e);
+                            }
                         }
+
+                        exceptionCaught(session, e);
                     }
 
-                    exceptionCaught(session, e);
-                }
+                    break;
 
-                break;
+                case ContentHeaderBody.TYPE:
 
-            case ContentHeaderBody.TYPE:
+                    _protocolSession.messageContentHeaderReceived(frame.getChannel(), (ContentHeaderBody) bodyFrame);
+                    break;
 
-                _protocolSession.messageContentHeaderReceived(frame.getChannel(), (ContentHeaderBody) bodyFrame);
-                break;
+                case ContentBody.TYPE:
 
-            case ContentBody.TYPE:
+                    _protocolSession.messageContentBodyReceived(frame.getChannel(), (ContentBody) bodyFrame);
+                    break;
 
-                _protocolSession.messageContentBodyReceived(frame.getChannel(), (ContentBody) bodyFrame);
-                break;
+                case HeartbeatBody.TYPE:
 
-            case HeartbeatBody.TYPE:
+                    if (debug)
+                    {
+                        _logger.debug("Received heartbeat");
+                    }
 
-                if (debug)
-                {
-                    _logger.debug("Received heartbeat");
-                }
+                    break;
 
-                break;
+                default:
 
-            default:
+            }
 
+            _connection.bytesReceived(_protocolSession.getIoSession().getReadBytes());
         }
+        else if (message instanceof ProtocolInitiation)
+        {
+            // We get here if the server sends a response to our initial protocol header
+            // suggesting an alternate ProtocolVersion; the server will then close the
+            // connection.
+            ProtocolInitiation protocolInit = (ProtocolInitiation) message;
+            ProtocolVersion pv = protocolInit.checkVersion();
+            getConnection().setProtocolVersion(pv);
 
-        _connection.bytesReceived(_protocolSession.getIoSession().getReadBytes());
+            // get round a bug in old versions of qpid whereby the connection is not closed
+            _stateManager.changeState(AMQState.CONNECTION_CLOSED);
+        }
     }
 
     private static int _messagesOut;
@@ -514,6 +530,12 @@
     {
         getStateManager().attainState(s);
     }
+
+    public AMQState attainState(Set<AMQState> states) throws AMQException
+    {
+        return getStateManager().attainState(states);
+    }
+
 
     /**
      * Convenience method that writes a frame to the protocol session. Equivalent to calling

Modified: incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java?rev=598105&r1=598104&r2=598105&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java (original)
+++ incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java Sun Nov 25 17:41:31 2007
@@ -121,7 +121,7 @@
         _minaProtocolSession.setWriteTimeout(LAST_WRITE_FUTURE_JOIN_TIMEOUT);
         _stateManager = stateManager;
         _stateManager.setProtocolSession(this);
-        _protocolVersion = ProtocolVersion.getLatestSupportedVersion();
+        _protocolVersion = connection.getProtocolVersion();
         _methodDispatcher = ClientMethodDispatcherImpl.newMethodDispatcher(ProtocolVersion.getLatestSupportedVersion(),
                                                                  stateManager);
         _connection = connection;
@@ -133,7 +133,7 @@
         // start the process of setting up the connection. This is the first place that
         // data is written to the server.
 
-        _minaProtocolSession.write(new ProtocolInitiation(ProtocolVersion.getLatestSupportedVersion()));
+        _minaProtocolSession.write(new ProtocolInitiation(_connection.getProtocolVersion()));
     }
 
     public String getClientID()

Modified: incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/state/AMQState.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/state/AMQState.java?rev=598105&r1=598104&r2=598105&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/state/AMQState.java (original)
+++ incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/state/AMQState.java Sun Nov 25 17:41:31 2007
@@ -24,8 +24,22 @@
  * States used in the AMQ protocol. Used by the finite state machine to determine
  * valid responses.
  */
-public class AMQState
+public enum AMQState
 {
+
+    CONNECTION_NOT_STARTED(1, "CONNECTION_NOT_STARTED"),
+
+    CONNECTION_NOT_TUNED(2, "CONNECTION_NOT_TUNED"),
+
+    CONNECTION_NOT_OPENED(3, "CONNECTION_NOT_OPENED"),
+
+    CONNECTION_OPEN(4, "CONNECTION_OPEN"),
+
+    CONNECTION_CLOSING(5, "CONNECTION_CLOSING"),
+
+    CONNECTION_CLOSED(6, "CONNECTION_CLOSED");
+
+
     private final int _id;
 
     private final String _name;
@@ -41,16 +55,6 @@
         return "AMQState: id = " + _id + " name: " + _name;
     }
 
-    public static final AMQState CONNECTION_NOT_STARTED = new AMQState(1, "CONNECTION_NOT_STARTED");
-    
-    public static final AMQState CONNECTION_NOT_TUNED = new AMQState(2, "CONNECTION_NOT_TUNED");
-    
-    public static final AMQState CONNECTION_NOT_OPENED = new AMQState(3, "CONNECTION_NOT_OPENED");        
-
-    public static final AMQState CONNECTION_OPEN = new AMQState(4, "CONNECTION_OPEN");
-
-    public static final AMQState CONNECTION_CLOSING = new AMQState(5, "CONNECTION_CLOSING");
-    
-    public static final AMQState CONNECTION_CLOSED = new AMQState(6, "CONNECTION_CLOSED");
-    
+
+
 }

Modified: incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java?rev=598105&r1=598104&r2=598105&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java (original)
+++ incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java Sun Nov 25 17:41:31 2007
@@ -30,6 +30,7 @@
 import org.slf4j.LoggerFactory;
 
 import java.util.Iterator;
+import java.util.Set;
 import java.util.concurrent.CopyOnWriteArraySet;
 
 /**
@@ -164,5 +165,42 @@
     public MethodRegistry getMethodRegistry()
     {
         return getProtocolSession().getMethodRegistry();
+    }
+
+    public AMQState attainState(Set<AMQState> stateSet) throws AMQException
+    {
+        synchronized (_stateLock)
+        {
+            final long waitUntilTime = System.currentTimeMillis() + MAXIMUM_STATE_WAIT_TIME;
+            long waitTime = MAXIMUM_STATE_WAIT_TIME;
+
+            while (!stateSet.contains(_currentState) && (waitTime > 0))
+            {
+                try
+                {
+                    _stateLock.wait(MAXIMUM_STATE_WAIT_TIME);
+                }
+                catch (InterruptedException e)
+                {
+                    _logger.warn("Thread interrupted");
+                }
+
+                if (!stateSet.contains(_currentState))
+                {
+                    waitTime = waitUntilTime - System.currentTimeMillis();
+                }
+            }
+
+            if (!stateSet.contains(_currentState))
+            {
+                _logger.warn("State not achieved within permitted time.  Current state " + _currentState
+                             + ", desired state: " + stateSet);
+                throw new AMQException("State not achieved within permitted time.  Current state " + _currentState
+                                       + ", desired state: " + stateSet);
+            }
+            return _currentState;
+        }
+
+
     }
 }

Modified: incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java?rev=598105&r1=598104&r2=598105&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java (original)
+++ incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/jms/ConnectionURL.java Sun Nov 25 17:41:31 2007
@@ -21,6 +21,7 @@
 package org.apache.qpid.jms;
 
 import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.ProtocolVersion;
 
 import java.util.List;
 
@@ -41,6 +42,7 @@
     public static final String OPTIONS_DEFAULT_QUEUE_EXCHANGE = "defaultQueueExchange";
     public static final String OPTIONS_TEMPORARY_TOPIC_EXCHANGE = "temporaryTopicExchange";
     public static final String OPTIONS_TEMPORARY_QUEUE_EXCHANGE = "temporaryQueueExchange";
+    public static final String OPTIONS_PROTOCOL_VERSION = "protocolVersion";
 
     String getURL();
 
@@ -83,4 +85,6 @@
     AMQShortString getTemporaryQueueExchangeName();
 
     AMQShortString getTemporaryTopicExchangeName();
+
+    ProtocolVersion getProtocolVersion();
 }

Modified: incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java?rev=598105&r1=598104&r2=598105&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java (original)
+++ incubator/qpid/branches/M2.1/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java Sun Nov 25 17:41:31 2007
@@ -56,6 +56,7 @@
 
     /** Flag to indicate whether this decoder needs to handle protocol initiation. */
     private boolean _expectProtocolInitiation;
+    private boolean firstDecode = true;
 
     /**
      * Creates a new AMQP decoder.
@@ -81,14 +82,24 @@
      */
     protected boolean doDecode(IoSession session, ByteBuffer in, ProtocolDecoderOutput out) throws Exception
     {
-        if (_expectProtocolInitiation)
+
+        boolean decoded;
+        if (_expectProtocolInitiation  
+            || (firstDecode
+                && (in.remaining() > 0)
+                && (in.get(in.position()) == (byte)'A')))
         {
-            return doDecodePI(session, in, out);
+            decoded = doDecodePI(session, in, out);
         }
         else
         {
-            return doDecodeDataBlock(session, in, out);
+            decoded = doDecodeDataBlock(session, in, out);
+        }
+        if(firstDecode && decoded)
+        {
+            firstDecode = false;
         }
+        return decoded;
     }
 
     /**

Modified: incubator/qpid/branches/M2.1/java/common/templates/model/ProtocolVersionListClass.vm
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/common/templates/model/ProtocolVersionListClass.vm?rev=598105&r1=598104&r2=598105&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/common/templates/model/ProtocolVersionListClass.vm (original)
+++ incubator/qpid/branches/M2.1/java/common/templates/model/ProtocolVersionListClass.vm Sun Nov 25 17:41:31 2007
@@ -33,6 +33,8 @@
 import java.util.SortedSet;
 import java.util.Collections;
 import java.util.TreeSet;
+import java.util.Map;
+import java.util.HashMap;
 
 
 public class ProtocolVersion  implements Comparable
@@ -124,6 +126,9 @@
     }
     
     private static final SortedSet<ProtocolVersion> _supportedVersions;
+	private static final Map<String, ProtocolVersion> _nameToVersionMap =
+	                         new HashMap<String, ProtocolVersion>();
+    private static final ProtocolVersion _defaultVersion;							 
 
 	
 #foreach( $version in $model.getVersionSet() )
@@ -138,8 +143,17 @@
 #foreach( $version in $model.getVersionSet() )
 #set( $versionId = "v$version.getMajor()_$version.getMinor()" )
         versions.add($versionId);
+		_nameToVersionMap.put("${version.getMajor()}-${version.getMinor()}", $versionId);
 #end
         _supportedVersions = Collections.unmodifiableSortedSet(versions);
+		
+		
+		ProtocolVersion systemDefinedVersion =
+		    _nameToVersionMap.get(System.getProperty("org.apache.qpid.amqp_version"));
+			
+	    _defaultVersion = (systemDefinedVersion == null) 
+		                      ? getLatestSupportedVersion() 
+							  : systemDefinedVersion;
     }
 
     
@@ -149,7 +163,16 @@
     }
     
     
+
+    public static ProtocolVersion parse(String name)
+    {
+        return _nameToVersionMap.get(name);
+    }
     
+    public static ProtocolVersion defaultProtocolVersion()
+    {
+        return _defaultVersion;
+    }
     
 
 }