You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ri...@apache.org on 2007/01/16 16:16:55 UTC

svn commit: r496725 [3/11] - in /incubator/qpid/branches/perftesting/qpid: gentools/ gentools/src/org/apache/qpid/gentools/ gentools/templ.cpp/ gentools/templ.java/ java/ java/broker/ java/broker/etc/ java/broker/src/main/java/org/apache/qpid/server/ j...

Modified: incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java?view=diff&rev=496725&r1=496724&r2=496725
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java Tue Jan 16 07:16:39 2007
@@ -21,12 +21,11 @@
 package org.apache.qpid.server.exchange;
 
 import org.apache.log4j.Logger;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.AMQTypedValue;
 
-import java.util.Collections;
-import java.util.Map;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Set;
+import java.util.*;
 
 /**
  * Defines binding and matching based on a set of headers.
@@ -35,11 +34,53 @@
 {
     private static final Logger _logger = Logger.getLogger(HeadersBinding.class);
 
-    private final Map _mappings = new HashMap();
-    private final Set<Object> required = new HashSet<Object>();
-    private final Set<Map.Entry> matches = new HashSet<Map.Entry>();
+    private final FieldTable _mappings = new FieldTable();
+    private final Set<String> required = new HashSet<String>();
+    private final Map<String,Object> matches = new HashMap<String,Object>();
     private boolean matchAny;
 
+    private final class MatchesOrProcessor implements FieldTable.FieldTableElementProcessor
+    {
+        private Boolean _result = Boolean.FALSE;
+
+        public boolean processElement(String propertyName, AMQTypedValue value)
+        {
+            if((value != null) && (value.getValue() != null) && value.getValue().equals(matches.get(propertyName)))
+            {
+                _result = Boolean.TRUE;
+                return false;
+            }
+            return true;
+        }
+
+        public Object getResult()
+        {
+            return _result;
+        }
+    }
+
+    private final class RequiredOrProcessor implements FieldTable.FieldTableElementProcessor
+    {
+        Boolean _result = Boolean.FALSE;
+
+        public boolean processElement(String propertyName, AMQTypedValue value)
+        {
+            if(required.contains(propertyName))
+            {
+                _result = Boolean.TRUE;
+                return false;
+            }
+            return true;
+        }
+
+        public Object getResult()
+        {
+            return _result;
+        }
+    }
+
+
+
     /**
      * Creates a binding for a set of mappings. Those mappings whose value is
      * null or the empty string are assumed only to be required headers, with
@@ -47,33 +88,50 @@
      * define a required match of value. 
      * @param mappings the defined mappings this binding should use
      */
-    HeadersBinding(Map mappings)
+
+    HeadersBinding(FieldTable mappings)
     {
-        //noinspection unchecked
-        this(mappings == null ? new HashSet<Map.Entry>() : mappings.entrySet());
-        _mappings.putAll(mappings);
+        Enumeration propertyNames = mappings.getPropertyNames();
+        while(propertyNames.hasMoreElements())
+        {
+            String propName = (String) propertyNames.nextElement();
+            _mappings.put(propName, mappings.getObject(propName));
+        }
+        initMappings();
     }
 
-    private HeadersBinding(Set<Map.Entry> entries)
+    private void initMappings()
     {
-        for (Map.Entry e : entries)
+
+        _mappings.processOverElements(new FieldTable.FieldTableElementProcessor()
         {
-            if (isSpecial(e.getKey()))
-            {
-                processSpecial((String) e.getKey(), e.getValue());
-            }
-            else if (e.getValue() == null || e.getValue().equals(""))
+
+            public boolean processElement(String propertyName, AMQTypedValue value)
             {
-                required.add(e.getKey());
+                if (isSpecial(propertyName))
+                {
+                    processSpecial(propertyName, value.getValue());
+                }
+                else if (value.getValue() == null || value.getValue().equals(""))
+                {
+                    required.add(propertyName);
+                }
+                else
+                {
+                    matches.put(propertyName,value.getValue());
+                }
+
+                return true;
             }
-            else
+
+            public Object getResult()
             {
-                matches.add(e);
+                return null;
             }
-        }
+        });
     }
 
-    protected Map getMappings()
+    protected FieldTable getMappings()
     {
         return _mappings;
     }
@@ -84,7 +142,7 @@
      * @return true if the headers define any required keys and match any required
      * values
      */
-    public boolean matches(Map headers)
+    public boolean matches(FieldTable headers)
     {
         if(headers == null)
         {
@@ -96,18 +154,37 @@
         }
     }
 
-    private boolean and(Map headers)
+    private boolean and(FieldTable headers)
     {
-        //need to match all the defined mapping rules:
-        return headers.keySet().containsAll(required)
-                && headers.entrySet().containsAll(matches);
+        if(headers.keys().containsAll(required))
+        {
+            for(Map.Entry<String, Object> e : matches.entrySet())
+            {
+                if(!e.getValue().equals(headers.getObject(e.getKey())))
+                {
+                    return false;
+                }
+            }
+            return true;
+        }
+        else
+        {
+            return false;
+        }
     }
 
-    private boolean or(Map headers)
+
+    private boolean or(final FieldTable headers)
     {
-        //only need to match one mapping rule:
-        return !Collections.disjoint(headers.keySet(), required)
-                || !Collections.disjoint(headers.entrySet(), matches);
+        if(required.isEmpty() || !(Boolean) headers.processOverElements(new RequiredOrProcessor()))
+        {
+            return ((!matches.isEmpty()) && (Boolean) headers.processOverElements(new MatchesOrProcessor()))
+                    || (required.isEmpty() && matches.isEmpty());
+        }
+        else
+        {
+            return true;
+        }
     }
 
     private void processSpecial(String key, Object value)

Modified: incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java?view=diff&rev=496725&r1=496724&r2=496725
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java Tue Jan 16 07:16:39 2007
@@ -22,10 +22,7 @@
 
 import org.apache.log4j.Logger;
 import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.BasicContentHeaderProperties;
-import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.framing.FieldTableFactory;
+import org.apache.qpid.framing.*;
 import org.apache.qpid.server.management.MBeanConstructor;
 import org.apache.qpid.server.management.MBeanDescription;
 import org.apache.qpid.server.queue.AMQMessage;
@@ -34,10 +31,7 @@
 
 import javax.management.JMException;
 import javax.management.openmbean.*;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
 import java.util.concurrent.CopyOnWriteArrayList;
 
 /**
@@ -119,16 +113,24 @@
                 String queueName = registration.queue.getName();
 
                 HeadersBinding headers = registration.binding;
-                Map<Object, Object> headerMappings = headers.getMappings();
-                List<String> mappingList = new ArrayList<String>();
+                FieldTable headerMappings = headers.getMappings();
+                final List<String> mappingList = new ArrayList<String>();
 
-                for (Map.Entry<Object, Object> en : headerMappings.entrySet())
+                headerMappings.processOverElements(new FieldTable.FieldTableElementProcessor()
                 {
-                    String key = en.getKey().toString();
-                    String value = en.getValue().toString();
 
-                    mappingList.add(key + "=" + value);
-                }
+                    public boolean processElement(String propertyName, AMQTypedValue value)
+                    {
+                        mappingList.add(propertyName + "=" + value.getValue());
+                        return true;
+                    }
+
+                    public Object getResult()
+                    {
+                        return mappingList;
+                    }
+                });
+
 
                 Object[] bindingItemValues = {count++, queueName, mappingList.toArray(new String[0])};
                 CompositeData bindingData = new CompositeDataSupport(_bindingDataType, _bindingItemNames, bindingItemValues);
@@ -155,7 +157,7 @@
             }
 
             String[] bindings  = binding.split(",");
-            FieldTable fieldTable = FieldTableFactory.newFieldTable();
+            FieldTable bindingMap = new FieldTable();
             for (int i = 0; i < bindings.length; i++)
             {
                 String[] keyAndValue = bindings[i].split("=");
@@ -163,10 +165,10 @@
                 {
                     throw new JMException("Format for headers binding should be \"<attribute1>=<value1>,<attribute2>=<value2>\" ");
                 }
-                fieldTable.put(keyAndValue[0], keyAndValue[1]);
+                bindingMap.setString(keyAndValue[0], keyAndValue[1]);
             }
 
-            _bindings.add(new Registration(new HeadersBinding(fieldTable), queue));
+            _bindings.add(new Registration(new HeadersBinding(bindingMap), queue));
         }
 
     } // End of MBean class
@@ -185,7 +187,7 @@
 
     public void route(AMQMessage payload) throws AMQException
     {
-        Map headers = getHeaders(payload.getContentHeaderBody());
+        FieldTable headers = getHeaders(payload.getContentHeaderBody());
         if (_logger.isDebugEnabled())
         {
             _logger.debug("Exchange " + getName() + ": routing message with headers " + headers);
@@ -248,7 +250,7 @@
         return !_bindings.isEmpty();
     }
 
-    protected Map getHeaders(ContentHeaderBody contentHeaderFrame)
+    protected FieldTable getHeaders(ContentHeaderBody contentHeaderFrame)
     {
         //what if the content type is not 'basic'? 'file' and 'stream' content classes also define headers,
         //but these are not yet implemented.

Modified: incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicCancelMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicCancelMethodHandler.java?view=diff&rev=496725&r1=496724&r2=496725
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicCancelMethodHandler.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicCancelMethodHandler.java Tue Jan 16 07:16:39 2007
@@ -54,7 +54,12 @@
         channel.unsubscribeConsumer(protocolSession, body.consumerTag);
         if(!body.nowait)
         {
-            final AMQFrame responseFrame = BasicCancelOkBody.createAMQFrame(evt.getChannelId(), body.consumerTag);
+            // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+            // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+            // Be aware of possible changes to parameter order as versions change.
+            final AMQFrame responseFrame = BasicCancelOkBody.createAMQFrame(evt.getChannelId(),
+                (byte)8, (byte)0,	// AMQP version (major, minor)
+                body.consumerTag);	// consumerTag
             protocolSession.writeFrame(responseFrame);
         }
     }

Modified: incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java?view=diff&rev=496725&r1=496724&r2=496725
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java Tue Jan 16 07:16:39 2007
@@ -81,7 +81,12 @@
                                                               body.arguments, body.noLocal);
                 if (!body.nowait)
                 {
-                    session.writeFrame(BasicConsumeOkBody.createAMQFrame(channelId, consumerTag));
+                    // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+                    // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+                    // Be aware of possible changes to parameter order as versions change.
+                    session.writeFrame(BasicConsumeOkBody.createAMQFrame(channelId,
+                        (byte)8, (byte)0,	// AMQP version (major, minor)
+                        consumerTag));		// consumerTag
                 }
 
                 //now allow queue to start async processing of any backlog of messages
@@ -90,16 +95,28 @@
             catch (AMQInvalidSelectorException ise)
             {
                 _log.info("Closing connection due to invalid selector");
-                session.writeFrame(ChannelCloseBody.createAMQFrame(channelId, AMQConstant.INVALID_SELECTOR.getCode(),
-                                                                   ise.getMessage(), BasicConsumeBody.CLASS_ID,
-                                                                   BasicConsumeBody.METHOD_ID));
+                // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+                // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+                // Be aware of possible changes to parameter order as versions change.
+                session.writeFrame(ChannelCloseBody.createAMQFrame(channelId,
+                    (byte)8, (byte)0,	// AMQP version (major, minor)
+                    BasicConsumeBody.getClazz((byte)8, (byte)0),	// classId
+                    BasicConsumeBody.getMethod((byte)8, (byte)0),	// methodId
+                    AMQConstant.INVALID_SELECTOR.getCode(),	// replyCode
+                    ise.getMessage()));		// replyText
             }
             catch (ConsumerTagNotUniqueException e)
             {
                 String msg = "Non-unique consumer tag, '" + body.consumerTag + "'";
-                session.writeFrame(ConnectionCloseBody.createAMQFrame(channelId, AMQConstant.NOT_ALLOWED.getCode(), msg,
-                                                                      BasicConsumeBody.CLASS_ID,
-                                                                      BasicConsumeBody.METHOD_ID));
+                // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+                // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+                // Be aware of possible changes to parameter order as versions change.
+                session.writeFrame(ConnectionCloseBody.createAMQFrame(channelId,
+                    (byte)8, (byte)0,	// AMQP version (major, minor)
+                    BasicConsumeBody.getClazz((byte)8, (byte)0),	// classId
+                    BasicConsumeBody.getMethod((byte)8, (byte)0),	// methodId
+                    AMQConstant.NOT_ALLOWED.getCode(),	// replyCode
+                    msg));	// replyText
             }
         }
     }

Modified: incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java?view=diff&rev=496725&r1=496724&r2=496725
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java Tue Jan 16 07:16:39 2007
@@ -64,7 +64,15 @@
             protocolSession.closeChannel(evt.getChannelId());
             // TODO: modify code gen to make getClazz and getMethod public methods rather than protected
             // then we can remove the hardcoded 0,0
-            AMQFrame cf = ChannelCloseBody.createAMQFrame(evt.getChannelId(), 500, "Unknown exchange name", 0, 0);
+            // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+            // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+            // Be aware of possible changes to parameter order as versions change.
+            AMQFrame cf = ChannelCloseBody.createAMQFrame(evt.getChannelId(),
+                (byte)8, (byte)0,	// AMQP version (major, minor)
+                ChannelCloseBody.getClazz((byte)8, (byte)0),	// classId
+                ChannelCloseBody.getMethod((byte)8, (byte)0),	// methodId
+                500,	// replyCode
+                "Unknown exchange name");	// replyText
             protocolSession.writeFrame(cf);
         }
         else

Modified: incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicQosHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicQosHandler.java?view=diff&rev=496725&r1=496724&r2=496725
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicQosHandler.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicQosHandler.java Tue Jan 16 07:16:39 2007
@@ -44,6 +44,9 @@
                                AMQProtocolSession session, AMQMethodEvent<BasicQosBody> evt) throws AMQException
     {
         session.getChannel(evt.getChannelId()).setPrefetchCount(evt.getMethod().prefetchCount);
-        session.writeFrame(new AMQFrame(evt.getChannelId(), new BasicQosOkBody()));
+        // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+        // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+        // Be aware of possible changes to parameter order as versions change.
+        session.writeFrame(new AMQFrame(evt.getChannelId(), new BasicQosOkBody((byte)8, (byte)0)));
     }
 }

Modified: incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java?view=diff&rev=496725&r1=496724&r2=496725
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java Tue Jan 16 07:16:39 2007
@@ -55,7 +55,10 @@
         _logger.info("Received channel close for id " + evt.getChannelId() + " citing class " + body.classId +
                      " and method " + body.methodId);
         protocolSession.closeChannel(evt.getChannelId());
-        AMQFrame response = ChannelCloseOkBody.createAMQFrame(evt.getChannelId());
+        // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+        // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+        // Be aware of possible changes to parameter order as versions change.
+        AMQFrame response = ChannelCloseOkBody.createAMQFrame(evt.getChannelId(), (byte)8, (byte)0);
         protocolSession.writeFrame(response);
     }
 }

Modified: incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelFlowHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelFlowHandler.java?view=diff&rev=496725&r1=496724&r2=496725
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelFlowHandler.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelFlowHandler.java Tue Jan 16 07:16:39 2007
@@ -58,6 +58,12 @@
         channel.setSuspended(!body.active);
         _logger.debug("Channel.Flow for channel " + evt.getChannelId() + ", active=" + body.active);
 
-        AMQFrame response = ChannelFlowOkBody.createAMQFrame(evt.getChannelId(), body.active);
+        // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+        // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+        // Be aware of possible changes to parameter order as versions change.
+        AMQFrame response = ChannelFlowOkBody.createAMQFrame(evt.getChannelId(),
+            (byte)8, (byte)0,	// AMQP version (major, minor)
+            body.active);	// active
         protocolSession.writeFrame(response);
-    }}
+    }
+}

Modified: incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java?view=diff&rev=496725&r1=496724&r2=496725
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java Tue Jan 16 07:16:39 2007
@@ -55,7 +55,10 @@
         final AMQChannel channel = new AMQChannel(evt.getChannelId(), registry.getMessageStore(),
                                                   exchangeRegistry);
         protocolSession.addChannel(channel);
-        AMQFrame response = ChannelOpenOkBody.createAMQFrame(evt.getChannelId());
+        // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+        // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+        // Be aware of possible changes to parameter order as versions change.
+        AMQFrame response = ChannelOpenOkBody.createAMQFrame(evt.getChannelId(), (byte)8, (byte)0);
         protocolSession.writeFrame(response);
     }
 }

Modified: incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java?view=diff&rev=496725&r1=496724&r2=496725
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java Tue Jan 16 07:16:39 2007
@@ -62,7 +62,10 @@
         {
             _logger.error("Error closing protocol session: " + e, e);
         }
-        final AMQFrame response = ConnectionCloseOkBody.createAMQFrame(evt.getChannelId());
+        // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+        // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+        // Be aware of possible changes to parameter order as versions change.
+        final AMQFrame response = ConnectionCloseOkBody.createAMQFrame(evt.getChannelId(), (byte)8, (byte)0);
         protocolSession.writeFrame(response);
     }
 }

Modified: incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java?view=diff&rev=496725&r1=496724&r2=496725
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java Tue Jan 16 07:16:39 2007
@@ -64,7 +64,12 @@
             contextKey = generateClientID();
         }
         protocolSession.setContextKey(contextKey);
-        AMQFrame response = ConnectionOpenOkBody.createAMQFrame((short)0, contextKey);
+        // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+        // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+        // Be aware of possible changes to parameter order as versions change.
+        AMQFrame response = ConnectionOpenOkBody.createAMQFrame((short)0,
+            (byte)8, (byte)0,	// AMQP version (major, minor)
+            contextKey);	// knownHosts
         stateManager.changeState(AMQState.CONNECTION_OPEN);
         protocolSession.writeFrame(response);
     }

Modified: incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java?view=diff&rev=496725&r1=496724&r2=496725
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java Tue Jan 16 07:16:39 2007
@@ -75,25 +75,43 @@
                 // throw new AMQException(AMQConstant.NOT_ALLOWED.getCode(), AMQConstant.NOT_ALLOWED.getName());
                 _logger.info("Authentication failed");
                 stateManager.changeState(AMQState.CONNECTION_CLOSING);
-                AMQFrame close = ConnectionCloseBody.createAMQFrame(0, AMQConstant.NOT_ALLOWED.getCode(),
-                        AMQConstant.NOT_ALLOWED.getName(),
-                        ConnectionCloseBody.CLASS_ID,
-                        ConnectionCloseBody.METHOD_ID);
+                // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+                // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+                // Be aware of possible changes to parameter order as versions change.
+                AMQFrame close = ConnectionCloseBody.createAMQFrame(0,
+                    (byte)8, (byte)0,	// AMQP version (major, minor)
+                    ConnectionCloseBody.getClazz((byte)8, (byte)0),		// classId
+                    ConnectionCloseBody.getMethod((byte)8, (byte)0),	// methodId
+                    AMQConstant.NOT_ALLOWED.getCode(),	// replyCode
+                    AMQConstant.NOT_ALLOWED.getName());	// replyText
                 protocolSession.writeFrame(close);
                 disposeSaslServer(protocolSession);
                 break;
             case SUCCESS:
                 _logger.info("Connected as: " + ss.getAuthorizationID());
                 stateManager.changeState(AMQState.CONNECTION_NOT_TUNED);
-                AMQFrame tune = ConnectionTuneBody.createAMQFrame(0, Integer.MAX_VALUE,
-                        ConnectionStartOkMethodHandler.getConfiguredFrameSize(),
-                        HeartbeatConfig.getInstance().getDelay());
+                // TODO: Check the value of channelMax here: This should be the max
+                // value of a 2-byte unsigned integer (as channel is only 2 bytes on the wire),
+                // not Integer.MAX_VALUE (which is signed 4 bytes).
+                // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+                // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+                // Be aware of possible changes to parameter order as versions change.
+                AMQFrame tune = ConnectionTuneBody.createAMQFrame(0,
+                    (byte)8, (byte)0,	// AMQP version (major, minor)
+                    Integer.MAX_VALUE,	// channelMax
+                    ConnectionStartOkMethodHandler.getConfiguredFrameSize(),	// frameMax
+                    HeartbeatConfig.getInstance().getDelay());	// heartbeat
                 protocolSession.writeFrame(tune);
                 disposeSaslServer(protocolSession);
                 break;
             case CONTINUE:
                 stateManager.changeState(AMQState.CONNECTION_NOT_AUTH);
-                AMQFrame challenge = ConnectionSecureBody.createAMQFrame(0, authResult.challenge);
+                // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+                // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+                // Be aware of possible changes to parameter order as versions change.
+                AMQFrame challenge = ConnectionSecureBody.createAMQFrame(0,
+                    (byte)8, (byte)0,	// AMQP version (major, minor)
+                    authResult.challenge);	// challenge
                 protocolSession.writeFrame(challenge);
         }
     }

Modified: incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java?view=diff&rev=496725&r1=496724&r2=496725
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java Tue Jan 16 07:16:39 2007
@@ -92,13 +92,24 @@
                     _logger.info("Connected as: " + ss.getAuthorizationID());
 
                     stateManager.changeState(AMQState.CONNECTION_NOT_TUNED);
-                    AMQFrame tune = ConnectionTuneBody.createAMQFrame(0, Integer.MAX_VALUE, getConfiguredFrameSize(),
-                                                                      HeartbeatConfig.getInstance().getDelay());
+                    // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+                    // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+                    // Be aware of possible changes to parameter order as versions change.
+                    AMQFrame tune = ConnectionTuneBody.createAMQFrame(0,
+                        (byte)8, (byte)0,	// AMQP version (major, minor)
+                        Integer.MAX_VALUE,	// channelMax
+                        getConfiguredFrameSize(),	// frameMax
+                        HeartbeatConfig.getInstance().getDelay());	// heartbeat
                     protocolSession.writeFrame(tune);
                     break;
                 case CONTINUE:
                     stateManager.changeState(AMQState.CONNECTION_NOT_AUTH);
-                    AMQFrame challenge = ConnectionSecureBody.createAMQFrame(0, authResult.challenge);
+                    // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+                    // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+                    // Be aware of possible changes to parameter order as versions change.
+                    AMQFrame challenge = ConnectionSecureBody.createAMQFrame(0,
+                        (byte)8, (byte)0,	// AMQP version (major, minor)
+                        authResult.challenge);	// challenge
                     protocolSession.writeFrame(challenge);
             }
         }

Modified: incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java?view=diff&rev=496725&r1=496724&r2=496725
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java Tue Jan 16 07:16:39 2007
@@ -64,6 +64,11 @@
                                ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession,
                                AMQMethodEvent<ExchangeBoundBody> evt) throws AMQException
     {
+        // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+        // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+        byte major = (byte)8;
+        byte minor = (byte)0;
+        
         ExchangeBoundBody body = evt.getMethod();
 
         String exchangeName = body.exchange;
@@ -77,8 +82,11 @@
         AMQFrame response;
         if (exchange == null)
         {
-            response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(), EXCHANGE_NOT_FOUND,
-                                                             "Exchange " + exchangeName + " not found");
+            // AMQP version change:  Be aware of possible changes to parameter order as versions change.
+            response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(),
+                major, minor,	// AMQP version (major, minor)
+                EXCHANGE_NOT_FOUND,	// replyCode
+                "Exchange " + exchangeName + " not found");	// replyText
         }
         else if (routingKey == null)
         {
@@ -86,11 +94,19 @@
             {
                 if (exchange.hasBindings())
                 {
-                    response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(), OK, null);
+                    // AMQP version change:  Be aware of possible changes to parameter order as versions change.
+                    response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(),
+                        major, minor,	// AMQP version (major, minor)
+                        OK,	// replyCode
+                        null);	// replyText
                 }
                 else
                 {
-                    response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(), NO_BINDINGS, null);
+                    // AMQP version change:  Be aware of possible changes to parameter order as versions change.
+                    response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(),
+                        major, minor,	// AMQP version (major, minor)
+                        NO_BINDINGS,	// replyCode
+                        null);	// replyText
                 }
             }
             else
@@ -98,20 +114,29 @@
                 AMQQueue queue = queueRegistry.getQueue(queueName);
                 if (queue == null)
                 {
-                    response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(), QUEUE_NOT_FOUND,
-                                                                      "Queue " + queueName + " not found");
+                    // AMQP version change:  Be aware of possible changes to parameter order as versions change.
+                    response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(),
+                        major, minor,	// AMQP version (major, minor)
+                        QUEUE_NOT_FOUND,	// replyCode
+                        "Queue " + queueName + " not found");	// replyText
                 }
                 else
                 {
                     if (exchange.isBound(queue))
                     {
-                        response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(), OK, null);
+                        // AMQP version change:  Be aware of possible changes to parameter order as versions change.
+                        response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(),
+                            major, minor,	// AMQP version (major, minor)
+                            OK,	// replyCode
+                            null);	// replyText
                     }
                     else
                     {
-                        response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(), QUEUE_NOT_BOUND,
-                                                                      "Queue " + queueName + " not bound to exchange " +
-                                                                      exchangeName);
+                        // AMQP version change:  Be aware of possible changes to parameter order as versions change.
+                        response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(),
+                            major, minor,	// AMQP version (major, minor)
+                            QUEUE_NOT_BOUND,	// replyCode
+                            "Queue " + queueName + " not bound to exchange " + exchangeName);	// replyText
                     }
                 }
             }
@@ -121,24 +146,30 @@
             AMQQueue queue = queueRegistry.getQueue(queueName);
             if (queue == null)
             {
-                response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(), QUEUE_NOT_FOUND,
-                                                                  "Queue " + queueName + " not found");
+                // AMQP version change:  Be aware of possible changes to parameter order as versions change.
+                response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(),
+                    major, minor,	// AMQP version (major, minor)
+                    QUEUE_NOT_FOUND,	// replyCode
+                    "Queue " + queueName + " not found");	// replyText
             }
             else
             {
                 if (exchange.isBound(body.routingKey, queue))
                 {
-                    response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(), OK,
-                                                                     null);
+                    // AMQP version change:  Be aware of possible changes to parameter order as versions change.
+                    response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(),
+                        major, minor,	// AMQP version (major, minor)
+                        OK,	// replyCode
+                        null);	// replyText
                 }
                 else
                 {
+                    // AMQP version change:  Be aware of possible changes to parameter order as versions change.
                     response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(),
-                                                                     SPECIFIC_QUEUE_NOT_BOUND_WITH_RK,
-                                                                     "Queue " + queueName +
-                                                                     " not bound with routing key " +
-                                                                     body.routingKey + " to exchange " +
-                                                                     exchangeName);
+                        major, minor,	// AMQP version (major, minor)
+                        SPECIFIC_QUEUE_NOT_BOUND_WITH_RK,	// replyCode
+                        "Queue " + queueName + " not bound with routing key " +
+                        body.routingKey + " to exchange " + exchangeName);	// replyText
                 }
             }
         }
@@ -146,16 +177,20 @@
         {
             if (exchange.isBound(body.routingKey))
             {
-                response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(), OK,
-                                                                 null);
+                // AMQP version change:  Be aware of possible changes to parameter order as versions change.
+                response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(),
+                    major, minor,	// AMQP version (major, minor)
+                    OK,	// replyCode
+                    null);	// replyText
             }
             else
             {
+                // AMQP version change:  Be aware of possible changes to parameter order as versions change.
                 response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(),
-                                                                 NO_QUEUE_BOUND_WITH_RK,
-                                                                 "No queue bound with routing key " +
-                                                                 body.routingKey + " to exchange " +
-                                                                 exchangeName);
+                    major, minor,	// AMQP version (major, minor)
+                    NO_QUEUE_BOUND_WITH_RK,	// replyCode
+                    "No queue bound with routing key " + body.routingKey +
+                    " to exchange " + exchangeName);	// replyText
             }
         }
         protocolSession.writeFrame(response);

Modified: incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java?view=diff&rev=496725&r1=496724&r2=496725
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java Tue Jan 16 07:16:39 2007
@@ -75,7 +75,10 @@
         }
         if(!body.nowait)
         {
-            AMQFrame response = ExchangeDeclareOkBody.createAMQFrame(evt.getChannelId());
+            // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+            // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+            // Be aware of possible changes to parameter order as versions change.
+            AMQFrame response = ExchangeDeclareOkBody.createAMQFrame(evt.getChannelId(), (byte)8, (byte)0);
             protocolSession.writeFrame(response);
         }
     }

Modified: incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeleteHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeleteHandler.java?view=diff&rev=496725&r1=496724&r2=496725
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeleteHandler.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeleteHandler.java Tue Jan 16 07:16:39 2007
@@ -53,7 +53,10 @@
         try
         {
             exchangeRegistry.unregisterExchange(body.exchange, body.ifUnused);
-            AMQFrame response = ExchangeDeleteOkBody.createAMQFrame(evt.getChannelId());
+            // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+            // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+            // Be aware of possible changes to parameter order as versions change.
+            AMQFrame response = ExchangeDeleteOkBody.createAMQFrame(evt.getChannelId(), (byte)8, (byte)0);
             protocolSession.writeFrame(response);
         }
         catch (ExchangeInUseException e)

Modified: incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java?view=diff&rev=496725&r1=496724&r2=496725
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java Tue Jan 16 07:16:39 2007
@@ -90,7 +90,10 @@
         }
         if (!body.nowait)
         {
-            final AMQFrame response = QueueBindOkBody.createAMQFrame(evt.getChannelId());
+            // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+            // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+            // Be aware of possible changes to parameter order as versions change.
+            final AMQFrame response = QueueBindOkBody.createAMQFrame(evt.getChannelId(), (byte)8, (byte)0);
             protocolSession.writeFrame(response);
         }
     }

Modified: incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java?view=diff&rev=496725&r1=496724&r2=496725
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java Tue Jan 16 07:16:39 2007
@@ -102,7 +102,14 @@
         }
         if (!body.nowait)
         {
-            AMQFrame response = QueueDeclareOkBody.createAMQFrame(evt.getChannelId(), body.queue, 0L, 0L);
+            // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+            // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+            // Be aware of possible changes to parameter order as versions change.
+            AMQFrame response = QueueDeclareOkBody.createAMQFrame(evt.getChannelId(),
+                (byte)8, (byte)0,	// AMQP version (major, minor)
+                0L, // consumerCount
+                0L, // messageCount
+                body.queue); // queue
             _log.info("Queue " + body.queue + " declared successfully");
             protocolSession.writeFrame(response);
         }

Modified: incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java?view=diff&rev=496725&r1=496724&r2=496725
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java Tue Jan 16 07:16:39 2007
@@ -81,7 +81,12 @@
         {
             int purged = queue.delete(body.ifUnused, body.ifEmpty);
             _store.removeQueue(queue.getName());
-            session.writeFrame(QueueDeleteOkBody.createAMQFrame(evt.getChannelId(), purged));
+            // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+            // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+            // Be aware of possible changes to parameter order as versions change.
+            session.writeFrame(QueueDeleteOkBody.createAMQFrame(evt.getChannelId(),
+                (byte)8, (byte)0,	// AMQP version (major, minor)
+                purged));	// messageCount
         }
     }
 }

Modified: incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java?view=diff&rev=496725&r1=496724&r2=496725
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java Tue Jan 16 07:16:39 2007
@@ -52,7 +52,10 @@
         try{
             AMQChannel channel = protocolSession.getChannel(evt.getChannelId());
             channel.commit();
-            protocolSession.writeFrame(TxCommitOkBody.createAMQFrame(evt.getChannelId()));
+            // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+            // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+            // Be aware of possible changes to parameter order as versions change.
+            protocolSession.writeFrame(TxCommitOkBody.createAMQFrame(evt.getChannelId(), (byte)8, (byte)0));
             channel.processReturns(protocolSession);            
         }catch(AMQException e){
             throw evt.getMethod().getChannelException(e.getErrorCode(), "Failed to commit: " + e.getMessage());

Modified: incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java?view=diff&rev=496725&r1=496724&r2=496725
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java Tue Jan 16 07:16:39 2007
@@ -51,7 +51,10 @@
         try{
             AMQChannel channel = protocolSession.getChannel(evt.getChannelId());
             channel.rollback();
-            protocolSession.writeFrame(TxRollbackOkBody.createAMQFrame(evt.getChannelId()));
+            // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+            // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+            // Be aware of possible changes to parameter order as versions change.
+            protocolSession.writeFrame(TxRollbackOkBody.createAMQFrame(evt.getChannelId(), (byte)8, (byte)0));
             //Now resend all the unacknowledged messages back to the original subscribers.
             //(Must be done after the TxnRollback-ok response).
             channel.resend(protocolSession);

Modified: incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxSelectHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxSelectHandler.java?view=diff&rev=496725&r1=496724&r2=496725
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxSelectHandler.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxSelectHandler.java Tue Jan 16 07:16:39 2007
@@ -48,6 +48,9 @@
                                AMQMethodEvent<TxSelectBody> evt) throws AMQException
     {
         protocolSession.getChannel(evt.getChannelId()).setTransactional(true);
-        protocolSession.writeFrame(TxSelectOkBody.createAMQFrame(evt.getChannelId()));
+        // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+        // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+        // Be aware of possible changes to parameter order as versions change.
+        protocolSession.writeFrame(TxSelectOkBody.createAMQFrame(evt.getChannelId(), (byte)8, (byte)0));
     }
 }

Modified: incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java?view=diff&rev=496725&r1=496724&r2=496725
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java Tue Jan 16 07:16:39 2007
@@ -165,8 +165,14 @@
                 _minor = pi.protocolMinor;
                 String mechanisms = ApplicationRegistry.getInstance().getAuthenticationManager().getMechanisms();
                 String locales = "en_US";
-                AMQFrame response = ConnectionStartBody.createAMQFrame((short) 0, pi.protocolMajor, pi.protocolMinor, null,
-                                                                       mechanisms.getBytes(), locales.getBytes());
+                // Interfacing with generated code - be aware of possible changes to parameter order as versions change.
+                AMQFrame response = ConnectionStartBody.createAMQFrame((short) 0,
+            		_major, _minor,	// AMQP version (major, minor)
+                    locales.getBytes(),	// locales
+                    mechanisms.getBytes(),	// mechanisms
+                    null,	// serverProperties
+                	(short)_major,	// versionMajor
+                    (short)_minor);	// versionMinor
                 _minaProtocolSession.write(response);
             }
             catch (AMQException e)
@@ -316,8 +322,13 @@
         return _channelMap.get(channelId);
     }
 
-    public void addChannel(AMQChannel channel)
+    public void addChannel(AMQChannel channel) throws AMQException
     {
+        if (_closed)
+        {
+            throw new AMQException("Session is closed");    
+        }
+
         _channelMap.put(channel.getChannelId(), channel);
         checkForNotification();
     }

Modified: incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java?view=diff&rev=496725&r1=496724&r2=496725
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java Tue Jan 16 07:16:39 2007
@@ -168,11 +168,20 @@
         }
         else if(throwable instanceof IOException)
         {
-            _logger.error("IOException caught in" + session + ", session closed implictly: " + throwable, throwable);            
+            _logger.error("IOException caught in" + session + ", session closed implictly: " + throwable, throwable);
         }
         else
         {
-            protocolSession.write(ConnectionCloseBody.createAMQFrame(0, 200, throwable.getMessage(), 0, 0));
+            // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+            // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+            // Be aware of possible changes to parameter order as versions change.
+            protocolSession.write(ConnectionCloseBody.createAMQFrame(0,
+            	(byte)8, (byte)0,	// AMQP version (major, minor)
+            	0,	// classId
+                0,	// methodId
+                200,	// replyCode
+                throwable.getMessage()	// replyText
+                ));
             _logger.error("Exception caught in" + session + ", closing session explictly: " + throwable, throwable);
             protocolSession.close();
         }

Modified: incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java?view=diff&rev=496725&r1=496724&r2=496725
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java Tue Jan 16 07:16:39 2007
@@ -70,7 +70,7 @@
      * @param channel the channel to associate with this session. It is an error to
      * associate the same channel with more than one session but this is not validated.
      */
-    void addChannel(AMQChannel channel);
+    void addChannel(AMQChannel channel) throws AMQException;
 
     /**
      * Close a specific channel. This will remove any resources used by the channel, including:

Modified: incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java?view=diff&rev=496725&r1=496724&r2=496725
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java Tue Jan 16 07:16:39 2007
@@ -18,6 +18,9 @@
 package org.apache.qpid.server.protocol;
 
 import org.apache.qpid.AMQException;
+import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.framing.ConnectionCloseBody;
+import org.apache.qpid.framing.AMQFrame;
 import org.apache.qpid.server.AMQChannel;
 import org.apache.qpid.server.management.AMQManagedObject;
 import org.apache.qpid.server.management.MBeanConstructor;
@@ -183,33 +186,25 @@
     }
 
     /**
-     * @see AMQMinaProtocolSession#closeChannel(int) 
-     */
-    public void closeChannel(int id) throws JMException
-    {
-        try
-        {
-            AMQChannel channel = _session.getChannel(id);
-            if (channel == null)
-            {
-                throw new JMException("The channel (channel Id = " + id + ") does not exist");
-            }
-
-            _session.closeChannel(id);
-        }
-        catch (AMQException ex)
-        {
-            throw new MBeanException(ex, ex.toString());
-        }
-    }
-
-    /**
      * closes the connection. The administrator can use this management operation to close connection to free up
      * resources.
      * @throws JMException
      */
     public void closeConnection() throws JMException
     {
+        
+        // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+        // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+        // Be aware of possible changes to parameter order as versions change.
+        final AMQFrame response = ConnectionCloseBody.createAMQFrame(0,
+            (byte)8, (byte)0,	// AMQP version (major, minor)
+            0,	// classId
+            0,	// methodId
+        	AMQConstant.REPLY_SUCCESS.getCode(),	// replyCode
+            "Broker Management Console has closing the connection."	// replyText
+            );
+        _session.writeFrame(response);
+
         try
         {
             _session.closeSession();

Modified: incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ManagedConnection.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ManagedConnection.java?view=diff&rev=496725&r1=496724&r2=496725
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ManagedConnection.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ManagedConnection.java Tue Jan 16 07:16:39 2007
@@ -114,15 +114,6 @@
     void rollbackTransactions(@MBeanOperationParameter(name="channel Id", description="channel Id")int channelId) throws JMException;
 
     /**
-     * Unsubscribes the consumers and unregisters the channel from managed objects.
-     */
-    @MBeanOperation(name="closeChannel",
-                    description="Closes the channel with given channel Id and connected consumers will be unsubscribed",
-                    impact= MBeanOperationInfo.ACTION)
-    void closeChannel(@MBeanOperationParameter(name="channel Id", description="channel Id")int channelId)
-        throws Exception;
-
-    /**
      * Closes all the related channels and unregisters this connection from managed objects.
      */
     @MBeanOperation(name="closeConnection",

Modified: incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java?view=diff&rev=496725&r1=496724&r2=496725
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java Tue Jan 16 07:16:39 2007
@@ -157,10 +157,20 @@
 
     public CompositeAMQDataBlock getDataBlock(int channel, String consumerTag, long deliveryTag)
     {
+        
         AMQFrame[] allFrames = new AMQFrame[2 + _contentBodies.size()];
 
-        allFrames[0] = BasicDeliverBody.createAMQFrame(channel, consumerTag, deliveryTag, _redelivered,
-                                                       getExchangeName(), getRoutingKey());
+        // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+        // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+        // Be aware of possible changes to parameter order as versions change.
+        allFrames[0] = BasicDeliverBody.createAMQFrame(channel,
+        	(byte)8, (byte)0,	// AMQP version (major, minor)
+            consumerTag,	// consumerTag
+        	deliveryTag,	// deliveryTag
+            getExchangeName(),	// exchange
+            _redelivered,	// redelivered
+            getRoutingKey()	// routingKey
+            );
         allFrames[1] = ContentHeaderBody.createAMQFrame(channel, _contentHeaderBody);
         for (int i = 2; i < allFrames.length; i++)
         {

Modified: incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java?view=diff&rev=496725&r1=496724&r2=496725
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java Tue Jan 16 07:16:39 2007
@@ -213,7 +213,10 @@
             {
                 return;
             }
-            _log.info("Async Delivery Message:" + message + " to :" + sub);
+            if (_log.isDebugEnabled())
+            {
+                _log.debug("Async Delivery Message:" + message + " to :" + sub);
+            }
 
             sub.send(message, _queue);
 
@@ -278,7 +281,10 @@
 
     public void deliver(String name, AMQMessage msg) throws FailedDequeueException
     {
-        _log.info(id() + "deliver :" + System.identityHashCode(msg));
+        if (_log.isDebugEnabled())
+        {
+            _log.debug(id() + "deliver :" + System.identityHashCode(msg));
+        }
 
         //Check if we have someone to deliver the message to.
         _lock.lock();
@@ -288,7 +294,10 @@
 
             if (s == null) //no-one can take the message right now.
             {
-                _log.info(id() + "Testing Message(" + System.identityHashCode(msg) + ") for Queued Delivery");
+                if (_log.isDebugEnabled())
+                {
+                    _log.debug(id() + "Testing Message(" + System.identityHashCode(msg) + ") for Queued Delivery");
+                }
                 if (!msg.isImmediate())
                 {
                     addMessageToQueue(msg);
@@ -297,21 +306,33 @@
                     _lock.unlock();
 
                     //Pre Deliver to all subscriptions
-                    _log.info(id() + "We have " + _subscriptions.getSubscriptions().size() + " subscribers to give the message to.");
+                    if (_log.isDebugEnabled())
+                    {
+                        _log.debug(id() + "We have " + _subscriptions.getSubscriptions().size() + 
+                                   " subscribers to give the message to.");
+                    }
                     for (Subscription sub : _subscriptions.getSubscriptions())
                     {
 
                         // stop if the message gets delivered whilst PreDelivering if we have a shared queue.
                         if (_queue.isShared() && msg.getDeliveredToConsumer())
                         {
-                            _log.info(id() + "Stopping PreDelivery as message(" + System.identityHashCode(msg) + ") is already delivered.");
+                            if (_log.isDebugEnabled())
+                            {
+                                _log.debug(id() + "Stopping PreDelivery as message(" + System.identityHashCode(msg) +
+                                           ") is already delivered.");
+                            }
                             continue;
                         }
 
                         // Only give the message to those that want them.
                         if (sub.hasInterest(msg))
                         {
-                            _log.info(id() + "Queuing message(" + System.identityHashCode(msg) + ") for PreDelivery for subscriber(" + System.identityHashCode(sub) + ")");
+                            if (_log.isDebugEnabled())
+                            {
+                                _log.debug(id() + "Queuing message(" + System.identityHashCode(msg) + 
+                                           ") for PreDelivery for subscriber(" + System.identityHashCode(sub) + ")");
+                            }
                             sub.enqueueForPreDelivery(msg);
                         }
                     }
@@ -322,7 +343,11 @@
                 //release lock now
                 _lock.unlock();
 
-                _log.info(id() + "Delivering Message:" + System.identityHashCode(msg) + " to(" + System.identityHashCode(s) + ") :" + s);
+                if (_log.isDebugEnabled())
+                {
+                    _log.debug(id() + "Delivering Message:" + System.identityHashCode(msg) + " to(" + 
+                               System.identityHashCode(s) + ") :" + s);
+                }
                 //Deliver the message
                 s.send(msg, _queue);
             }
@@ -330,7 +355,7 @@
         finally
         {
             //ensure lock is released
-            if (_lock.isLocked())
+            if (_lock.isHeldByCurrentThread())
             {
                 _lock.unlock();
             }
@@ -371,9 +396,12 @@
 
     public void processAsync(Executor executor)
     {
-        _log.info("Processing Async. Queued:" + hasQueuedMessages() + "(" + getQueueMessageCount() + ")" +
-                  " Active:" + _subscriptions.hasActiveSubscribers() +
-                  " Processing:" + _processing.get());
+        if (_log.isDebugEnabled())
+        {
+            _log.debug("Processing Async. Queued:" + hasQueuedMessages() + "(" + getQueueMessageCount() + ")" +
+                       " Active:" + _subscriptions.hasActiveSubscribers() +
+                       " Processing:" + _processing.get());
+        }
 
         if (hasQueuedMessages() && _subscriptions.hasActiveSubscribers())
         {

Modified: incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java?view=diff&rev=496725&r1=496724&r2=496725
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java Tue Jan 16 07:16:39 2007
@@ -379,7 +379,13 @@
         if (!_closed)
         {
             _logger.info("Closing autoclose subscription:" + this);
-            protocolSession.writeFrame(BasicCancelOkBody.createAMQFrame(channel.getChannelId(), consumerTag));
+            // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+            // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+            // Be aware of possible changes to parameter order as versions change.
+            protocolSession.writeFrame(BasicCancelOkBody.createAMQFrame(channel.getChannelId(),
+        		(byte)8, (byte)0,	// AMQP version (major, minor)
+            	consumerTag	// consumerTag
+                ));
             _closed = true;
         }
     }
@@ -392,9 +398,17 @@
 
     private ByteBuffer createEncodedDeliverFrame(long deliveryTag, String routingKey, String exchange)
     {
-        AMQFrame deliverFrame = BasicDeliverBody.createAMQFrame(channel.getChannelId(), consumerTag,
-                                                                deliveryTag, false, exchange,
-                                                                routingKey);
+        // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+        // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+        // Be aware of possible changes to parameter order as versions change.
+        AMQFrame deliverFrame = BasicDeliverBody.createAMQFrame(channel.getChannelId(),
+        	(byte)8, (byte)0,	// AMQP version (major, minor)
+            consumerTag,	// consumerTag
+        	deliveryTag,	// deliveryTag
+            exchange,	// exchange
+            false,	// redelivered
+            routingKey	// routingKey
+            );
         ByteBuffer buf = ByteBuffer.allocate((int) deliverFrame.getSize()); // XXX: Could cast be a problem?
         deliverFrame.writePayload(buf);
         buf.flip();

Modified: incubator/qpid/branches/perftesting/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java?view=diff&rev=496725&r1=496724&r2=496725
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java Tue Jan 16 07:16:39 2007
@@ -24,17 +24,18 @@
 import java.util.HashMap;
 
 import junit.framework.TestCase;
+import org.apache.qpid.framing.FieldTable;
 
 /**
  */
 public class HeadersBindingTest extends TestCase
 {
-    private Map<String, String> bindHeaders = new HashMap<String, String>();
-    private Map<String, String> matchHeaders = new HashMap<String, String>();
+    private FieldTable bindHeaders = new FieldTable();
+    private FieldTable matchHeaders = new FieldTable();
 
     public void testDefault_1()
     {
-        bindHeaders.put("A", "Value of A");
+        bindHeaders.setString("A", "Value of A");
 
         matchHeaders.put("A", "Value of A");
 

Modified: incubator/qpid/branches/perftesting/qpid/java/client/pom.xml
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/client/pom.xml?view=diff&rev=496725&r1=496724&r2=496725
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/client/pom.xml (original)
+++ incubator/qpid/branches/perftesting/qpid/java/client/pom.xml Tue Jan 16 07:16:39 2007
@@ -35,7 +35,6 @@
 
     <properties>
         <topDirectoryLocation>..</topDirectoryLocation>
-        <amqj.logging.level>warn</amqj.logging.level>
     </properties>
 
     <dependencies>
@@ -96,6 +95,11 @@
 
     <build>
         <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-antrun-plugin</artifactId>
+            </plugin>
+
             <plugin>
                 <groupId>org.apache.maven.plugins</groupId>
                 <artifactId>maven-surefire-plugin</artifactId>

Modified: incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java?view=diff&rev=496725&r1=496724&r2=496725
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java Tue Jan 16 07:16:39 2007
@@ -465,12 +465,25 @@
     private void createChannelOverWire(int channelId, int prefetchHigh, int prefetchLow, boolean transacted)
             throws AMQException
     {
+        // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+        // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+        // Be aware of possible changes to parameter order as versions change.
         _protocolHandler.syncWrite(
-                ChannelOpenBody.createAMQFrame(channelId, null), ChannelOpenOkBody.class);
+            ChannelOpenBody.createAMQFrame(channelId,
+                (byte)8, (byte)0,	// AMQP version (major, minor)
+                null),	// outOfBand
+                ChannelOpenOkBody.class);
 
         //todo send low water mark when protocol allows.
+        // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+        // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+        // Be aware of possible changes to parameter order as versions change.
         _protocolHandler.syncWrite(
-                BasicQosBody.createAMQFrame(channelId, 0, prefetchHigh, false),
+            BasicQosBody.createAMQFrame(channelId,
+                (byte)8, (byte)0,	// AMQP version (major, minor)
+                false,	// global
+                prefetchHigh,	// prefetchCount
+                0),	// prefetchSize
                 BasicQosOkBody.class);
 
         if (transacted)
@@ -479,7 +492,10 @@
             {
                 _logger.debug("Issuing TxSelect for " + channelId);
             }
-            _protocolHandler.syncWrite(TxSelectBody.createAMQFrame(channelId), TxSelectOkBody.class);
+            // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+            // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+            // Be aware of possible changes to parameter order as versions change.
+            _protocolHandler.syncWrite(TxSelectBody.createAMQFrame(channelId, (byte)8, (byte)0), TxSelectOkBody.class);
         }
     }