You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2007/01/29 11:59:38 UTC

svn commit: r501003 - in /incubator/qpid/trunk/qpid: gentools/templ.java/ java/broker/src/main/java/org/apache/qpid/server/handler/ java/broker/src/main/java/org/apache/qpid/server/protocol/ java/broker/src/main/java/org/apache/qpid/server/queue/ java/...

Author: rgreig
Date: Mon Jan 29 02:59:33 2007
New Revision: 501003

URL: http://svn.apache.org/viewvc?view=rev&rev=501003
Log:
QPID-320 : Patch supplied by Rob Godfrey - Improve performance by remembering protocol version

Added:
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/VersionSpecificRegistry.java
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolVersionAware.java
Modified:
    incubator/qpid/trunk/qpid/gentools/templ.java/MethodBodyClass.tmpl
    incubator/qpid/trunk/qpid/gentools/templ.java/MethodRegistryClass.tmpl
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicQosHandler.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessageHandle.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/InMemoryMessageHandle.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/MessageCleanupException.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/MessageHandleFactory.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockEncoder.java
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyFactory.java
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyInstanceFactory.java
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java
    incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SkeletonMessageStore.java

Modified: incubator/qpid/trunk/qpid/gentools/templ.java/MethodBodyClass.tmpl
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/gentools/templ.java/MethodBodyClass.tmpl?view=diff&rev=501003&r1=501002&r2=501003
==============================================================================
--- incubator/qpid/trunk/qpid/gentools/templ.java/MethodBodyClass.tmpl (original)
+++ incubator/qpid/trunk/qpid/gentools/templ.java/MethodBodyClass.tmpl Mon Jan 29 02:59:33 2007
@@ -42,6 +42,12 @@
         {
             return new ${CLASS}${METHOD}Body(major, minor, in);
         }
+		
+	    public AMQMethodBody newInstance(byte major, byte minor, int clazzID, int methodID, ByteBuffer in, long size) throws AMQFrameDecodingException
+        {
+            return new ${CLASS}${METHOD}Body(major, minor, clazzID, methodID, in);
+        }
+ 
     };
     
     public static AMQMethodBodyInstanceFactory getFactory()
@@ -82,12 +88,18 @@
 
     public ${CLASS}${METHOD}Body(byte major, byte minor, ByteBuffer buffer) throws AMQFrameDecodingException
     {
+		this(major, minor, getClazz(major,minor), getMethod(major,minor), buffer);
+	}
+	
+    public ${CLASS}${METHOD}Body(byte major, byte minor, int clazzID, int methodID, ByteBuffer buffer) throws AMQFrameDecodingException
+    {
+
         super(major, minor);
-        _clazz = getClazz(major,minor); 
-        _method = getMethod(major,minor); 
+        _clazz = clazzID; 
+        _method = methodID; 
         %{FLIST}    ${mb_field_decode}
     }
-    public ${CLASS}${METHOD}Body(byte major, byte minor
+    public ${CLASS}${METHOD}Body(byte major, byte minor, int clazzID, int methodID
     %{FLIST}    ${mb_field_parameter_list}
                                          )
     {
@@ -149,7 +161,19 @@
 %{FLIST}    ${mb_field_parameter_list}
                                          )
     {
-        ${CLASS}${METHOD}Body body = new ${CLASS}${METHOD}Body(major, minor
+        return createAMQFrame(channelId, major, minor, getClazz(major,minor), getMethod(major,minor)
+%{FLIST}    ${mb_field_passed_parameter_list}
+        );
+
+                 
+        
+    }
+	
+	public static AMQFrame createAMQFrame(int channelId, byte major, byte minor, int clazzID, int methodID
+%{FLIST}    ${mb_field_parameter_list}
+                                         )
+    {
+        ${CLASS}${METHOD}Body body = new ${CLASS}${METHOD}Body(major, minor, clazzID, methodID
 %{FLIST}    ${mb_field_passed_parameter_list}
         );
 
@@ -157,4 +181,5 @@
         AMQFrame frame = new AMQFrame(channelId, body);
         return frame;
     }
+
 }

Modified: incubator/qpid/trunk/qpid/gentools/templ.java/MethodRegistryClass.tmpl
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/gentools/templ.java/MethodRegistryClass.tmpl?view=diff&rev=501003&r1=501002&r2=501003
==============================================================================
--- incubator/qpid/trunk/qpid/gentools/templ.java/MethodRegistryClass.tmpl (original)
+++ incubator/qpid/trunk/qpid/gentools/templ.java/MethodRegistryClass.tmpl Mon Jan 29 02:59:33 2007
@@ -33,13 +33,19 @@
 import org.apache.log4j.Logger;
 import org.apache.mina.common.ByteBuffer;
 
-class MainRegistry
+public class MainRegistry
 {
 	private static final HashMap<Long, AMQMethodBodyInstanceFactory> classIDMethodIDVersionBodyMap = new HashMap<Long, AMQMethodBodyInstanceFactory>();
 
 	
     private static final Logger _log = Logger.getLogger(MainRegistry.class);
 	
+
+    private static final int DEFAULT_MINOR_VERSION_COUNT = 10;
+    private static final int DEFAULT_MAJOR_VERSION_COUNT = 10;
+    
+    private static VersionSpecificRegistry[][] _specificRegistries = new VersionSpecificRegistry[DEFAULT_MAJOR_VERSION_COUNT][];
+    	
     static
     {
 %{CLIST}	${reg_map_put_method}
@@ -48,7 +54,9 @@
     public static AMQMethodBody get(short classID, short methodID, byte major, byte minor, ByteBuffer in, long size)
         throws AMQFrameDecodingException
     {
-        AMQMethodBodyInstanceFactory bodyFactory = classIDMethodIDVersionBodyMap.get(createMapKey(classID,methodID,major,minor));
+		VersionSpecificRegistry registry = getVersionSpecificRegistry(major, minor);
+        AMQMethodBodyInstanceFactory bodyFactory = registry.getMethodBody(classID,methodID);
+		
         if (bodyFactory == null)
         {
             throw new AMQFrameDecodingException(_log,
@@ -59,23 +67,65 @@
 
 	    
     }
+	
+	public static VersionSpecificRegistry getVersionSpecificRegistry(byte major, byte minor)
+	{
+		try
+		{
+			return _specificRegistries[(int)major][(int)minor];
+		}
+		catch (IndexOutOfBoundsException e)
+		{
+			return null;
+		}
+		catch (NullPointerException e)
+		{
+			return null;
+		}
+		
+		
+	}
     
+	private static VersionSpecificRegistry addVersionSpecificRegistry(byte major, byte minor)
+	{
+		VersionSpecificRegistry[][] registries = _specificRegistries;
+		if(major >= registries.length)
+		{
+			_specificRegistries = new VersionSpecificRegistry[(int)major + 1][];
+			System.arraycopy(registries, 0, _specificRegistries, 0, registries.length);
+			registries = _specificRegistries;
+		}
+		if(registries[major] == null)
+		{
+			registries[major] = new VersionSpecificRegistry[ minor >= DEFAULT_MINOR_VERSION_COUNT ? minor + 1 : DEFAULT_MINOR_VERSION_COUNT ];
+		}
+		else if(registries[major].length <= minor)
+		{
+			VersionSpecificRegistry[] minorArray = registries[major];
+			registries[major] = new VersionSpecificRegistry[ minor + 1 ];
+			System.arraycopy(minorArray, 0, registries[major], 0, minorArray.length);
+			
+		}
+		
+		VersionSpecificRegistry newRegistry = new VersionSpecificRegistry(major,minor);
+		
+		registries[major][minor] = newRegistry;
+		
+		return newRegistry;
+	}
+		
 	private static void registerMethod(short classID, short methodID, byte major, byte minor, AMQMethodBodyInstanceFactory instanceFactory )
 	{
-        classIDMethodIDVersionBodyMap.put(createMapKey(classID,methodID,major,minor), instanceFactory);
+		VersionSpecificRegistry registry = getVersionSpecificRegistry(major,minor);
+		if(registry == null)
+		{
+			registry = addVersionSpecificRegistry(major,minor);
+			
+		}
+		
+		registry.registerMethod(classID, methodID, instanceFactory);
+        
     }
 
     
-    private static Long createMapKey(short classID, short methodID, byte major, byte minor)
-    {
-    	/**
-         *	Mapping of 4 components into a guaranteed unique key:
-         *  MSB                                     LSB
-         *  +----+----+----+----+----+----+-----+-----+
-         *  |    0    | classID |methodID |major|minor|
-         *  +----+----+----+----+----+----+-----+-----+
-         */
-    	return new Long(((long)classID << 32) + ((long)methodID << 16) + ((long)major << 8) + minor);
-    }
-
 }

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicQosHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicQosHandler.java?view=diff&rev=501003&r1=501002&r2=501003
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicQosHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicQosHandler.java Mon Jan 29 02:59:33 2007
@@ -50,6 +50,6 @@
         // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
         // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
         // Be aware of possible changes to parameter order as versions change.
-        session.writeFrame(new AMQFrame(evt.getChannelId(), new BasicQosOkBody((byte)8, (byte)0)));
+        session.writeFrame(BasicQosOkBody.createAMQFrame(evt.getChannelId(),(byte)8, (byte)0));
     }
 }

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java?view=diff&rev=501003&r1=501002&r2=501003
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java Mon Jan 29 02:59:33 2007
@@ -61,6 +61,9 @@
 
     private static final String CLIENT_PROPERTIES_INSTANCE = ClientProperties.instance.toString();
 
+    // to save boxing the channelId and looking up in a map... cache in an array the low numbered
+    // channels.  This value must be of the form 2^x - 1.
+    private static final int CHANNEL_CACHE_SIZE = 0xff;
 
     private final IoSession _minaProtocolSession;
 
@@ -70,6 +73,8 @@
 
     private final Map<Integer, AMQChannel> _channelMap = new HashMap<Integer, AMQChannel>();
 
+    private final AMQChannel[] _cachedChannels = new AMQChannel[CHANNEL_CACHE_SIZE+1];
+
     private final CopyOnWriteArraySet<AMQMethodListener> _frameListeners = new CopyOnWriteArraySet<AMQMethodListener>();
 
     private final AMQStateManager _stateManager;
@@ -89,10 +94,12 @@
     private long _maxNoOfChannels = 1000;
 
     /* AMQP Version for this session */
-    private byte _major;
-    private byte _minor;
+    private byte _major = pv[pv.length-1][PROTOCOL_MAJOR];
+    private byte _minor = pv[pv.length-1][PROTOCOL_MINOR];
     private FieldTable _clientProperties;
     private final List<Task> _taskList = new CopyOnWriteArrayList<Task>();
+    private VersionSpecificRegistry _registry = MainRegistry.getVersionSpecificRegistry(pv[pv.length-1][PROTOCOL_MAJOR],pv[pv.length-1][PROTOCOL_MINOR]);
+    
 
 
     public ManagedObject getManagedObject()
@@ -165,11 +172,14 @@
             try
             {
                 pi.checkVersion(this); // Fails if not correct
+
                 // This sets the protocol version (and hence framing classes) for this session.
-                _major = pi.protocolMajor;
-                _minor = pi.protocolMinor;
+                setProtocolVersion(pi.protocolMajor,pi.protocolMinor);
+
                 String mechanisms = ApplicationRegistry.getInstance().getAuthenticationManager().getMechanisms();
+
                 String locales = "en_US";
+
                 // Interfacing with generated code - be aware of possible changes to parameter order as versions change.
                 AMQFrame response = ConnectionStartBody.createAMQFrame((short) 0,
             		_major, _minor,	// AMQP version (major, minor)
@@ -200,7 +210,7 @@
         {
             AMQFrame frame = (AMQFrame) message;
 
-            if (frame.bodyFrame instanceof AMQMethodBody)
+            if (frame.getBodyFrame() instanceof AMQMethodBody)
             {
                 methodFrameReceived(frame);
             }
@@ -217,8 +227,8 @@
         {
             _logger.debug("Method frame received: " + frame);
         }
-        final AMQMethodEvent<AMQMethodBody> evt = new AMQMethodEvent<AMQMethodBody>(frame.channel,
-                                                                                    (AMQMethodBody) frame.bodyFrame);
+        final AMQMethodEvent<AMQMethodBody> evt = new AMQMethodEvent<AMQMethodBody>(frame.getChannel(),
+                                                                                    (AMQMethodBody) frame.getBodyFrame());
         try
         {
             try
@@ -241,14 +251,14 @@
             catch (AMQChannelException e)
             {
                 _logger.error("Closing channel due to: " + e.getMessage());
-                writeFrame(e.getCloseFrame(frame.channel));
-                closeChannel(frame.channel);
+                writeFrame(e.getCloseFrame(frame.getChannel()));
+                closeChannel(frame.getChannel());
             }
             catch (AMQConnectionException e)
             {
                 _logger.error("Closing connection due to: " + e.getMessage());
                 closeSession();
-                writeFrame(e.getCloseFrame(frame.channel));
+                writeFrame(e.getCloseFrame(frame.getChannel()));
             }
         }
         catch (Exception e)
@@ -264,15 +274,15 @@
 
     private void contentFrameReceived(AMQFrame frame) throws AMQException
     {
-        if (frame.bodyFrame instanceof ContentHeaderBody)
+        if (frame.getBodyFrame() instanceof ContentHeaderBody)
         {
             contentHeaderReceived(frame);
         }
-        else if (frame.bodyFrame instanceof ContentBody)
+        else if (frame.getBodyFrame() instanceof ContentBody)
         {
             contentBodyReceived(frame);
         }
-        else if (frame.bodyFrame instanceof HeartbeatBody)
+        else if (frame.getBodyFrame() instanceof HeartbeatBody)
         {
             _logger.debug("Received heartbeat from client");
         }
@@ -288,7 +298,7 @@
         {
             _logger.debug("Content header frame received: " + frame);
         }
-        getChannel(frame.channel).publishContentHeader((ContentHeaderBody) frame.bodyFrame);
+        getChannel(frame.getChannel()).publishContentHeader((ContentHeaderBody) frame.getBodyFrame());
     }
 
     private void contentBodyReceived(AMQFrame frame) throws AMQException
@@ -297,7 +307,7 @@
         {
             _logger.debug("Content body frame received: " + frame);
         }
-        getChannel(frame.channel).publishContentBody((ContentBody)frame.bodyFrame, this);
+        getChannel(frame.getChannel()).publishContentBody((ContentBody)frame.getBodyFrame(), this);
     }
 
     /**
@@ -329,7 +339,9 @@
 
     public AMQChannel getChannel(int channelId) throws AMQException
     {
-        return _channelMap.get(channelId);
+        return ((channelId & CHANNEL_CACHE_SIZE) == channelId)
+                ? _cachedChannels[channelId]
+                : _channelMap.get(channelId);
     }
 
     public void addChannel(AMQChannel channel) throws AMQException
@@ -339,7 +351,13 @@
             throw new AMQException("Session is closed");    
         }
 
-        _channelMap.put(channel.getChannelId(), channel);
+        final int channelId = channel.getChannelId();
+        _channelMap.put(channelId, channel);
+
+        if(((channelId & CHANNEL_CACHE_SIZE) == channelId))
+        {
+            _cachedChannels[channelId] = channel;
+        }
         checkForNotification();
     }
 
@@ -389,7 +407,7 @@
      */
     public void closeChannel(int channelId) throws AMQException
     {
-        final AMQChannel channel = _channelMap.get(channelId);
+        final AMQChannel channel = getChannel(channelId);
         if (channel == null)
         {
             throw new IllegalArgumentException("Unknown channel id");
@@ -402,7 +420,8 @@
             }
             finally
             {
-                _channelMap.remove(channelId);
+                removeChannel(channelId);
+
             }
         }
     }
@@ -415,6 +434,10 @@
     public void removeChannel(int channelId)
     {
         _channelMap.remove(channelId);
+        if((channelId & CHANNEL_CACHE_SIZE) == channelId)
+        {
+            _cachedChannels[channelId] = null;
+        }
     }
 
     /**
@@ -444,6 +467,10 @@
             channel.close(this);
         }
         _channelMap.clear();
+        for(int i = 0; i <= CHANNEL_CACHE_SIZE; i++)
+        {
+            _cachedChannels[i]=null;
+        }
     }
 
     /**
@@ -534,10 +561,12 @@
         }
     }
 
-    /**
-     * Convenience methods for managing AMQP version.
-     * NOTE: Both major and minor will be set to 0 prior to protocol initiation.
-     */
+    private void setProtocolVersion(byte major, byte minor)
+    {
+        _major = major;
+        _minor = minor;
+        _registry = MainRegistry.getVersionSpecificRegistry(major,minor);
+    }
 
     public byte getProtocolMajorVersion()
     {
@@ -553,6 +582,12 @@
     {
         return _major == major && _minor == minor;
     }
+
+    public VersionSpecificRegistry getRegistry()
+    {
+        return _registry;
+    }
+
 
 
     public Object getClientIdentifier()

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java?view=diff&rev=501003&r1=501002&r2=501003
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java Mon Jan 29 02:59:33 2007
@@ -120,7 +120,10 @@
         _logger.info("Protocol Session closed");
         final AMQProtocolSession amqProtocolSession = AMQMinaProtocolSession.getAMQProtocolSession(protocolSession);
         //fixme  -- this can be null
-        amqProtocolSession.closeSession();
+        if(amqProtocolSession != null)
+        {
+            amqProtocolSession.closeSession();
+        }
     }
 
     public void sessionIdle(IoSession session, IdleStatus status) throws Exception

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java?view=diff&rev=501003&r1=501002&r2=501003
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java Mon Jan 29 02:59:33 2007
@@ -24,6 +24,7 @@
 import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.protocol.AMQProtocolWriter;
+import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
 import org.apache.qpid.server.AMQChannel;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 import org.apache.qpid.AMQException;
@@ -31,7 +32,7 @@
 import javax.security.sasl.SaslServer;
 
 
-public interface AMQProtocolSession extends AMQProtocolWriter
+public interface AMQProtocolSession extends AMQVersionAwareProtocolSession
 {
 
 
@@ -143,9 +144,5 @@
     void addSessionCloseTask(Task task);
 
     void removeSessionCloseTask(Task task);
-
-    byte getProtocolMajorVersion();
-
-    byte getProtocolMinorVersion();
 
 }

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java?view=diff&rev=501003&r1=501002&r2=501003
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java Mon Jan 29 02:59:33 2007
@@ -53,7 +53,7 @@
      */
     private AMQProtocolSession _publisher;
 
-    private final long _messageId;
+    private final Long _messageId;
 
     private final AtomicInteger _referenceCount = new AtomicInteger(1);
 
@@ -68,6 +68,13 @@
      * messages published with the 'immediate' flag.
      */
     private boolean _deliveredToConsumer;
+    /**
+     * We need to keep track of whether the message was 'immediate'
+     * as in extreme circumstances, when the checkDelieveredToConsumer
+     * is called, the message may already have been received and acknowledged,
+     * and the body removed from the store.
+     */
+    private boolean _immediate;
 
     private AtomicBoolean _taken = new AtomicBoolean(false);
 
@@ -160,11 +167,12 @@
         }
     }
 
-    public AMQMessage(long messageId, BasicPublishBody publishBody,
+    public AMQMessage(Long messageId, BasicPublishBody publishBody,
                       TransactionalContext txnContext)
     {
         _messageId = messageId;
         _txnContext = txnContext;
+        _immediate = publishBody.immediate;
         _transientMessageData.setPublishBody(publishBody);
 
         _taken = new AtomicBoolean(false);
@@ -183,7 +191,7 @@
      * @param factory
      * @throws AMQException
      */
-    public AMQMessage(long messageId, MessageStore store, MessageHandleFactory factory) throws AMQException
+    public AMQMessage(Long messageId, MessageStore store, MessageHandleFactory factory) throws AMQException
     {
         _messageId = messageId;
         _messageHandle = factory.createMessageHandle(messageId, store, true);
@@ -198,7 +206,7 @@
      * @param txnContext
      * @param contentHeader
      */
-    public AMQMessage(long messageId, BasicPublishBody publishBody,
+    public AMQMessage(Long messageId, BasicPublishBody publishBody,
                       TransactionalContext txnContext, ContentHeaderBody contentHeader) throws AMQException
     {
         this(messageId, publishBody, txnContext);
@@ -216,7 +224,7 @@
      * @param contentBodies
      * @throws AMQException
      */
-    public AMQMessage(long messageId, BasicPublishBody publishBody,
+    public AMQMessage(Long messageId, BasicPublishBody publishBody,
                       TransactionalContext txnContext,
                       ContentHeaderBody contentHeader, List<AMQQueue> destinationQueues,
                       List<ContentBody> contentBodies, MessageStore messageStore, StoreContext storeContext,
@@ -293,8 +301,9 @@
     public boolean addContentBodyFrame(StoreContext storeContext, ContentBody contentBody) throws AMQException
     {
         _transientMessageData.addBodyLength(contentBody.getSize());
-        _messageHandle.addContentBodyFrame(storeContext, _messageId, contentBody);
-        if (isAllContentReceived())
+        final boolean allContentReceived = isAllContentReceived();
+        _messageHandle.addContentBodyFrame(storeContext, _messageId, contentBody, allContentReceived);
+        if (allContentReceived)
         {
             deliver(storeContext);
             return true;
@@ -348,6 +357,7 @@
                 {
                     _log.debug("Ref count on message " + _messageId + " is zero; removing message");
                 }
+
                 // must check if the handle is null since there may be cases where we decide to throw away a message
                 // and the handle has not yet been constructed
                 if (_messageHandle != null)
@@ -372,6 +382,10 @@
                     Thread.dumpStack();
                 }
             }
+            if(_referenceCount.get()<0)
+            {
+                throw new MessageCleanupException("Reference count for message id " + _messageId + " has gone below 0.");
+            }
         }
     }
 
@@ -464,8 +478,8 @@
      */
     public void checkDeliveredToConsumer() throws NoConsumersException, AMQException
     {
-        BasicPublishBody pb = getPublishBody();
-        if (pb.immediate && !_deliveredToConsumer)
+
+        if (_immediate && !_deliveredToConsumer)
         {
             throw new NoConsumersException(this);
         }        

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessageHandle.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessageHandle.java?view=diff&rev=501003&r1=501002&r2=501003
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessageHandle.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessageHandle.java Mon Jan 29 02:59:33 2007
@@ -35,17 +35,17 @@
  */
 public interface AMQMessageHandle
 {
-    ContentHeaderBody getContentHeaderBody(long messageId) throws AMQException;
+    ContentHeaderBody getContentHeaderBody(Long messageId) throws AMQException;
 
     /**
      * @return the number of body frames associated with this message
      */
-    int getBodyCount(long messageId) throws AMQException;
+    int getBodyCount(Long messageId) throws AMQException;
 
     /**
      * @return the size of the body
      */
-    long getBodySize(long messageId) throws AMQException;
+    long getBodySize(Long messageId) throws AMQException;
 
     /**
      * Get a particular content body
@@ -53,25 +53,25 @@
      * @return a content body
      * @throws IllegalArgumentException if the index is invalid
      */
-    ContentBody getContentBody(long messageId, int index) throws IllegalArgumentException, AMQException;
+    ContentBody getContentBody(Long messageId, int index) throws IllegalArgumentException, AMQException;
 
-    void addContentBodyFrame(StoreContext storeContext, long messageId, ContentBody contentBody) throws AMQException;
+    void addContentBodyFrame(StoreContext storeContext, Long messageId, ContentBody contentBody, boolean isLastContentBody) throws AMQException;
 
-    BasicPublishBody getPublishBody(long messageId) throws AMQException;
+    BasicPublishBody getPublishBody(Long messageId) throws AMQException;
 
     boolean isRedelivered();
 
     void setRedelivered(boolean redelivered);
 
-    boolean isPersistent(long messageId) throws AMQException;
+    boolean isPersistent(Long messageId) throws AMQException;
 
-    void setPublishAndContentHeaderBody(StoreContext storeContext, long messageId, BasicPublishBody publishBody,
+    void setPublishAndContentHeaderBody(StoreContext storeContext, Long messageId, BasicPublishBody publishBody,
                                         ContentHeaderBody contentHeaderBody)
             throws AMQException;
 
-    void removeMessage(StoreContext storeContext, long messageId) throws AMQException;
+    void removeMessage(StoreContext storeContext, Long messageId) throws AMQException;
 
-    void enqueue(StoreContext storeContext, long messageId, AMQQueue queue) throws AMQException;
+    void enqueue(StoreContext storeContext, Long messageId, AMQQueue queue) throws AMQException;
 
-    void dequeue(StoreContext storeContext, long messageId, AMQQueue queue) throws AMQException;
+    void dequeue(StoreContext storeContext, Long messageId, AMQQueue queue) throws AMQException;
 }

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java?view=diff&rev=501003&r1=501002&r2=501003
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java Mon Jan 29 02:59:33 2007
@@ -252,6 +252,12 @@
         return _deliveryMgr.getMessages();
     }
 
+    public long getQueueDepth()
+    {
+        return _deliveryMgr.getTotalMessageSize();
+    }
+
+
     /**
      * @param messageId
      * @return AMQMessage with give id if exists. null if AMQMessage with given id doesn't exist.
@@ -315,13 +321,13 @@
         _maxMessageCount = value;
     }
 
-    public Long getMaximumQueueDepth()
+    public long getMaximumQueueDepth()
     {
         return _maxQueueDepth;
     }
 
     // Sets the queue depth, the max queue size
-    public void setMaximumQueueDepth(Long value)
+    public void setMaximumQueueDepth(long value)
     {
         _maxQueueDepth = value;
     }
@@ -624,5 +630,7 @@
     {
         _deleteTaskList.add(task);
     }
+
+
 
 }

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java?view=diff&rev=501003&r1=501002&r2=501003
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java Mon Jan 29 02:59:33 2007
@@ -191,25 +191,13 @@
      */
     public Long getQueueDepth() throws JMException
     {
-        List<AMQMessage> list = _queue.getMessagesOnTheQueue();
-        if (list.size() == 0)
-        {
-            return 0l;
-        }
+        return getQueueDepthKb();
+    }
 
-        long queueDepth = 0;
-        try
-        {
-            for (AMQMessage message : list)
-            {
-                queueDepth = queueDepth + getMessageSize(message);
-            }
-        }
-        catch (AMQException e)
-        {
-            throw new JMException("Unable to get message size: " + e);
-        }
-        return (long) Math.round(queueDepth / 1000);
+    public long getQueueDepthKb()
+    {
+        long queueBytesSize = _queue.getQueueDepth();
+        return queueBytesSize >> 10 ;
     }
 
     /**
@@ -245,7 +233,7 @@
         }
 
         // Check for threshold queue depth in bytes
-        long queueDepth = getQueueDepth();
+        long queueDepth = getQueueDepthKb();
         if (queueDepth >= _queue.getMaximumQueueDepth())
         {
             notifyClients("Queue depth(" + queueDepth + "), Queue size has reached the threshold high value");

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java?view=diff&rev=501003&r1=501002&r2=501003
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java Mon Jan 29 02:59:33 2007
@@ -38,6 +38,7 @@
 import java.util.concurrent.Executor;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
 
 
 /**
@@ -83,6 +84,7 @@
      * Lock is used to control access to hasQueuedMessages() and over the addition of messages to the queue.
      */
     private ReentrantLock _lock = new ReentrantLock();
+    private AtomicLong _totalMessageSize = new AtomicLong();
 
 
     ConcurrentSelectorDeliveryManager(SubscriptionManager subscriptions, AMQQueue queue)
@@ -116,6 +118,8 @@
 
         _messages.offer(msg);
 
+        _totalMessageSize.addAndGet(msg.getSize());
+
         return true;
     }
 
@@ -150,6 +154,13 @@
     }
 
 
+
+    public long getTotalMessageSize()
+    {
+        return _totalMessageSize.get();
+    }
+
+
     public synchronized List<AMQMessage> getMessages()
     {
         return new ArrayList<AMQMessage>(_messages);
@@ -213,6 +224,7 @@
                     }
 
                     msg.writeGetOk(protocolSession, channel.getChannelId(), deliveryTag, _queue.getMessageCount());
+                    _totalMessageSize.addAndGet(-msg.getSize());
                 }
             }
             finally
@@ -224,6 +236,7 @@
         }
     }
 
+
     public synchronized void removeAMessageFromTop(StoreContext storeContext) throws AMQException
     {
         AMQMessage msg = poll();
@@ -231,6 +244,7 @@
         {
             msg.dequeue(storeContext, _queue);
         }
+        _totalMessageSize.getAndAdd(-msg.getSize());
     }
 
     public synchronized long clearAllMessages(StoreContext storeContext) throws AMQException
@@ -241,8 +255,11 @@
         {
             msg.dequeue(storeContext, _queue);
             count++;
+            _totalMessageSize.set(0L);
             msg = poll();
+
         }
+
         return count;
     }
 
@@ -292,6 +309,7 @@
 
             //remove sent message from our queue.
             messageQueue.poll();
+            _totalMessageSize.addAndGet(-message.getSize());
         }
         catch (AMQException e)
         {

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java?view=diff&rev=501003&r1=501002&r2=501003
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java Mon Jan 29 02:59:33 2007
@@ -81,4 +81,6 @@
     void populatePreDeliveryQueue(Subscription subscription);
 
     boolean performGet(AMQProtocolSession session, AMQChannel channel, boolean acks) throws AMQException;
+
+    long getTotalMessageSize();
 }

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/InMemoryMessageHandle.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/InMemoryMessageHandle.java?view=diff&rev=501003&r1=501002&r2=501003
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/InMemoryMessageHandle.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/InMemoryMessageHandle.java Mon Jan 29 02:59:33 2007
@@ -47,22 +47,22 @@
     {
     }
 
-    public ContentHeaderBody getContentHeaderBody(long messageId) throws AMQException
+    public ContentHeaderBody getContentHeaderBody(Long messageId) throws AMQException
     {
         return _contentHeaderBody;
     }
 
-    public int getBodyCount(long messageId)
+    public int getBodyCount(Long messageId)
     {
         return _contentBodies.size();
     }
 
-    public long getBodySize(long messageId) throws AMQException
+    public long getBodySize(Long messageId) throws AMQException
     {
         return getContentHeaderBody(messageId).bodySize;
     }
 
-    public ContentBody getContentBody(long messageId, int index) throws AMQException, IllegalArgumentException
+    public ContentBody getContentBody(Long messageId, int index) throws AMQException, IllegalArgumentException
     {
         if (index > _contentBodies.size() - 1)
         {
@@ -72,13 +72,13 @@
         return _contentBodies.get(index);
     }
 
-    public void addContentBodyFrame(StoreContext storeContext, long messageId, ContentBody contentBody)
+    public void addContentBodyFrame(StoreContext storeContext, Long messageId, ContentBody contentBody, boolean isLastContentBody)
             throws AMQException
     {
         _contentBodies.add(contentBody);
     }
 
-    public BasicPublishBody getPublishBody(long messageId) throws AMQException
+    public BasicPublishBody getPublishBody(Long messageId) throws AMQException
     {
         return _publishBody;
     }
@@ -94,7 +94,7 @@
         _redelivered = redelivered;
     }
 
-    public boolean isPersistent(long messageId) throws AMQException
+    public boolean isPersistent(Long messageId) throws AMQException
     {
         //todo remove literal values to a constant file such as AMQConstants in common
         ContentHeaderBody chb = getContentHeaderBody(messageId);
@@ -108,7 +108,7 @@
      * @param contentHeaderBody
      * @throws AMQException
      */
-    public void setPublishAndContentHeaderBody(StoreContext storeContext, long messageId, BasicPublishBody publishBody,
+    public void setPublishAndContentHeaderBody(StoreContext storeContext, Long messageId, BasicPublishBody publishBody,
                                                ContentHeaderBody contentHeaderBody)
             throws AMQException
     {
@@ -116,17 +116,17 @@
         _contentHeaderBody = contentHeaderBody;
     }
 
-    public void removeMessage(StoreContext storeContext, long messageId) throws AMQException
+    public void removeMessage(StoreContext storeContext, Long messageId) throws AMQException
     {
         // NO OP
     }
 
-    public void enqueue(StoreContext storeContext, long messageId, AMQQueue queue) throws AMQException
+    public void enqueue(StoreContext storeContext, Long messageId, AMQQueue queue) throws AMQException
     {
         // NO OP
     }
 
-    public void dequeue(StoreContext storeContext, long messageId, AMQQueue queue) throws AMQException
+    public void dequeue(StoreContext storeContext, Long messageId, AMQQueue queue) throws AMQException
     {
         // NO OP
     }

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/MessageCleanupException.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/MessageCleanupException.java?view=diff&rev=501003&r1=501002&r2=501003
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/MessageCleanupException.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/MessageCleanupException.java Mon Jan 29 02:59:33 2007
@@ -32,4 +32,8 @@
     {
         super("Failed to cleanup message with id " + messageId, e);
     }
+    public MessageCleanupException(String message)
+    {
+        super(message);
+    }
 }

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/MessageHandleFactory.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/MessageHandleFactory.java?view=diff&rev=501003&r1=501002&r2=501003
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/MessageHandleFactory.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/MessageHandleFactory.java Mon Jan 29 02:59:33 2007
@@ -31,7 +31,7 @@
 public class MessageHandleFactory
 {
 
-    public AMQMessageHandle createMessageHandle(long messageId, MessageStore store, boolean persistent)
+    public AMQMessageHandle createMessageHandle(Long messageId, MessageStore store, boolean persistent)
     {
         // just hardcoded for now
         if (persistent)

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java?view=diff&rev=501003&r1=501002&r2=501003
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java Mon Jan 29 02:59:33 2007
@@ -32,6 +32,7 @@
 import java.util.ArrayList;
 import java.util.List;
 import java.util.LinkedList;
+import java.util.Collections;
 
 /**
  * @author Robert Greig (robert.j.greig@jpmorgan.com)
@@ -48,12 +49,13 @@
 
     private final MessageStore _messageStore;
 
+
     public WeakReferenceMessageHandle(MessageStore messageStore)
     {
         _messageStore = messageStore;
     }
 
-    public ContentHeaderBody getContentHeaderBody(long messageId) throws AMQException
+    public ContentHeaderBody getContentHeaderBody(Long messageId) throws AMQException
     {
         ContentHeaderBody chb = (_contentHeaderBody != null?_contentHeaderBody.get():null);
         if (chb == null)
@@ -66,7 +68,7 @@
         return chb;
     }
 
-    public int getBodyCount(long messageId) throws AMQException
+    public int getBodyCount(Long messageId) throws AMQException
     {
         if (_contentBodies == null)
         {
@@ -81,12 +83,12 @@
         return _contentBodies.size();
     }
 
-    public long getBodySize(long messageId) throws AMQException
+    public long getBodySize(Long messageId) throws AMQException
     {
         return getContentHeaderBody(messageId).bodySize;
     }
 
-    public ContentBody getContentBody(long messageId, int index) throws AMQException, IllegalArgumentException
+    public ContentBody getContentBody(Long messageId, int index) throws AMQException, IllegalArgumentException
     {
         if (index > _contentBodies.size() - 1)
         {
@@ -108,19 +110,30 @@
      * @param storeContext
      * @param messageId
      * @param contentBody
+     * @param isLastContentBody
      * @throws AMQException
      */
-    public void addContentBodyFrame(StoreContext storeContext, long messageId, ContentBody contentBody) throws AMQException
+    public void addContentBodyFrame(StoreContext storeContext, Long messageId, ContentBody contentBody, boolean isLastContentBody) throws AMQException
     {
-        if (_contentBodies == null)
+        if(_contentBodies == null && isLastContentBody)
         {
-            _contentBodies = new LinkedList<WeakReference<ContentBody>>();
+            _contentBodies = Collections.singletonList(new WeakReference<ContentBody>(contentBody));
+
+        }
+        else
+        {
+            if (_contentBodies == null)
+            {
+                _contentBodies = new LinkedList<WeakReference<ContentBody>>();
+            }
+
+
+            _contentBodies.add(new WeakReference<ContentBody>(contentBody));
         }
-        _contentBodies.add(new WeakReference<ContentBody>(contentBody));
-        _messageStore.storeContentBodyChunk(storeContext, messageId, _contentBodies.size() - 1, contentBody);
+        _messageStore.storeContentBodyChunk(storeContext, messageId, _contentBodies.size() - 1, contentBody, isLastContentBody);
     }
 
-    public BasicPublishBody getPublishBody(long messageId) throws AMQException
+    public BasicPublishBody getPublishBody(Long messageId) throws AMQException
     {
         BasicPublishBody bpb = (_publishBody != null?_publishBody.get():null);
         if (bpb == null)
@@ -143,7 +156,7 @@
         _redelivered = redelivered;
     }
 
-    public boolean isPersistent(long messageId) throws AMQException
+    public boolean isPersistent(Long messageId) throws AMQException
     {
         //todo remove literal values to a constant file such as AMQConstants in common
         ContentHeaderBody chb = getContentHeaderBody(messageId);
@@ -157,7 +170,7 @@
      * @param contentHeaderBody
      * @throws AMQException
      */
-    public void setPublishAndContentHeaderBody(StoreContext storeContext, long messageId, BasicPublishBody publishBody,
+    public void setPublishAndContentHeaderBody(StoreContext storeContext, Long messageId, BasicPublishBody publishBody,
                                                ContentHeaderBody contentHeaderBody)
             throws AMQException
     {
@@ -173,17 +186,17 @@
         _contentHeaderBody = new WeakReference<ContentHeaderBody>(contentHeaderBody);
     }
 
-    public void removeMessage(StoreContext storeContext, long messageId) throws AMQException
+    public void removeMessage(StoreContext storeContext, Long messageId) throws AMQException
     {
         _messageStore.removeMessage(storeContext, messageId);
     }
 
-    public void enqueue(StoreContext storeContext, long messageId, AMQQueue queue) throws AMQException
+    public void enqueue(StoreContext storeContext, Long messageId, AMQQueue queue) throws AMQException
     {
         _messageStore.enqueueMessage(storeContext, queue.getName(), messageId);
     }
 
-    public void dequeue(StoreContext storeContext, long messageId, AMQQueue queue) throws AMQException
+    public void dequeue(StoreContext storeContext, Long messageId, AMQQueue queue) throws AMQException
     {
         _messageStore.dequeueMessage(storeContext, queue.getName(), messageId);
     }

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java?view=diff&rev=501003&r1=501002&r2=501003
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java Mon Jan 29 02:59:33 2007
@@ -36,6 +36,7 @@
 
 import java.util.HashMap;
 import java.util.Map;
+import java.util.EnumMap;
 import java.util.concurrent.CopyOnWriteArraySet;
 
 /**
@@ -59,8 +60,9 @@
      * Maps from an AMQState instance to a Map from Class to StateTransitionHandler.
      * The class must be a subclass of AMQFrame.
      */
-    private final Map<AMQState, Map<Class<? extends AMQMethodBody>, StateAwareMethodListener<? extends AMQMethodBody>>> _state2HandlersMap =
-            new HashMap<AMQState, Map<Class<? extends AMQMethodBody>, StateAwareMethodListener<? extends AMQMethodBody>>>();
+    private final EnumMap<AMQState, Map<Class<? extends AMQMethodBody>, StateAwareMethodListener<? extends AMQMethodBody>>> _state2HandlersMap =
+            new EnumMap<AMQState, Map<Class<? extends AMQMethodBody>, StateAwareMethodListener<? extends AMQMethodBody>>>(AMQState.class);
+
 
     private CopyOnWriteArraySet<StateListener> _stateListeners = new CopyOnWriteArraySet<StateListener>();
 
@@ -83,14 +85,7 @@
 
     protected void registerListeners()
     {
-        Map<Class<? extends AMQMethodBody>, StateAwareMethodListener<? extends AMQMethodBody>> frame2handlerMap =
-                new HashMap<Class<? extends AMQMethodBody>, StateAwareMethodListener<? extends AMQMethodBody>>();
-
-        // we need to register a map for the null (i.e. all state) handlers otherwise you get
-        // a stack overflow in the handler searching code when you present it with a frame for which
-        // no handlers are registered
-        //
-        _state2HandlersMap.put(null, frame2handlerMap);
+        Map<Class<? extends AMQMethodBody>, StateAwareMethodListener<? extends AMQMethodBody>> frame2handlerMap;
 
         frame2handlerMap = new HashMap<Class<? extends AMQMethodBody>, StateAwareMethodListener<? extends AMQMethodBody>>();
         frame2handlerMap.put(ConnectionStartOkBody.class, ConnectionStartOkMethodHandler.getInstance());
@@ -205,26 +200,14 @@
         final Map<Class<? extends AMQMethodBody>, StateAwareMethodListener<? extends AMQMethodBody>>
                 classToHandlerMap = _state2HandlersMap.get(currentState);
 
-        if (classToHandlerMap == null)
-        {
-            // if no specialised per state handler is registered look for a
-            // handler registered for "all" states
-            return findStateTransitionHandler(null, frame);
-        }
-        final StateAwareMethodListener<B> handler = (StateAwareMethodListener<B>) classToHandlerMap.get(frame.getClass());
+        final StateAwareMethodListener<B> handler = classToHandlerMap == null
+                                                          ? null
+                                                          : (StateAwareMethodListener<B>) classToHandlerMap.get(frame.getClass());
+
         if (handler == null)
         {
-            if (currentState == null)
-            {
-                _logger.debug("No state transition handler defined for receiving frame " + frame);
-                return null;
-            }
-            else
-            {
-                // if no specialised per state handler is registered look for a
-                // handler registered for "all" states
-                return findStateTransitionHandler(null, frame);
-            }
+            _logger.debug("No state transition handler defined for receiving frame " + frame);
+            return null;
         }
         else
         {

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java?view=diff&rev=501003&r1=501002&r2=501003
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java Mon Jan 29 02:59:33 2007
@@ -27,11 +27,11 @@
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.queue.MessageMetaData;
-import org.apache.qpid.server.queue.QueueRegistry;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Collections;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicLong;
@@ -87,7 +87,7 @@
         }
     }
 
-    public void removeMessage(StoreContext context, long messageId)
+    public void removeMessage(StoreContext context, Long messageId)
     {
         if (_log.isDebugEnabled())
         {
@@ -107,12 +107,12 @@
         // Not required to do anything
     }
 
-    public void enqueueMessage(StoreContext context, AMQShortString name, long messageId) throws AMQException
+    public void enqueueMessage(StoreContext context, AMQShortString name, Long messageId) throws AMQException
     {
         // Not required to do anything
     }
 
-    public void dequeueMessage(StoreContext context, AMQShortString name, long messageId) throws AMQException
+    public void dequeueMessage(StoreContext context, AMQShortString name, Long messageId) throws AMQException
     {
         // Not required to do anything
     }
@@ -142,36 +142,44 @@
         return null;
     }
 
-    public long getNewMessageId()
+    public Long getNewMessageId()
     {
         return _messageId.getAndIncrement();
     }
 
-    public void storeContentBodyChunk(StoreContext context, long messageId, int index, ContentBody contentBody)
+    public void storeContentBodyChunk(StoreContext context, Long messageId, int index, ContentBody contentBody, boolean lastContentBody)
             throws AMQException
     {
         List<ContentBody> bodyList = _contentBodyMap.get(messageId);
-        if (bodyList == null)
+
+        if(bodyList == null && lastContentBody)
         {
-            bodyList = new ArrayList<ContentBody>();
-            _contentBodyMap.put(messageId, bodyList);
+            _contentBodyMap.put(messageId, Collections.singletonList(contentBody));
         }
+        else
+        {
+            if (bodyList == null)
+            {
+                bodyList = new ArrayList<ContentBody>();
+                _contentBodyMap.put(messageId, bodyList);
+            }
 
-        bodyList.add(index, contentBody);
+            bodyList.add(index, contentBody);
+        }
     }
 
-    public void storeMessageMetaData(StoreContext context, long messageId, MessageMetaData messageMetaData)
+    public void storeMessageMetaData(StoreContext context, Long messageId, MessageMetaData messageMetaData)
             throws AMQException
     {
         _metaDataMap.put(messageId, messageMetaData);
     }
 
-    public MessageMetaData getMessageMetaData(long messageId) throws AMQException
+    public MessageMetaData getMessageMetaData(Long messageId) throws AMQException
     {
         return _metaDataMap.get(messageId);
     }
 
-    public ContentBody getContentBodyChunk(long messageId, int index) throws AMQException
+    public ContentBody getContentBodyChunk(Long messageId, int index) throws AMQException
     {
         List<ContentBody> bodyList = _contentBodyMap.get(messageId);
         return bodyList.get(index);

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java?view=diff&rev=501003&r1=501002&r2=501003
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java Mon Jan 29 02:59:33 2007
@@ -50,15 +50,15 @@
      */
     void close() throws Exception;
 
-    void removeMessage(StoreContext storeContext, long messageId) throws AMQException;
+    void removeMessage(StoreContext storeContext, Long messageId) throws AMQException;
 
     void createQueue(AMQQueue queue) throws AMQException;
 
     void removeQueue(AMQShortString name) throws AMQException;
 
-    void enqueueMessage(StoreContext context, AMQShortString name, long messageId) throws AMQException;
+    void enqueueMessage(StoreContext context, AMQShortString name, Long messageId) throws AMQException;
 
-    void dequeueMessage(StoreContext context, AMQShortString name, long messageId) throws AMQException;
+    void dequeueMessage(StoreContext context, AMQShortString name, Long messageId) throws AMQException;
 
     void beginTran(StoreContext context) throws AMQException;
 
@@ -79,14 +79,14 @@
      * Return a valid, currently unused message id.
      * @return a message id
      */
-    long getNewMessageId();
+    Long getNewMessageId();
 
-    void storeContentBodyChunk(StoreContext context, long messageId, int index, ContentBody contentBody) throws AMQException;
+    void storeContentBodyChunk(StoreContext context, Long messageId, int index, ContentBody contentBody, boolean lastContentBody) throws AMQException;
 
-    void storeMessageMetaData(StoreContext context, long messageId, MessageMetaData messageMetaData) throws AMQException;
+    void storeMessageMetaData(StoreContext context, Long messageId, MessageMetaData messageMetaData) throws AMQException;
 
-    MessageMetaData getMessageMetaData(long messageId) throws AMQException;
+    MessageMetaData getMessageMetaData(Long messageId) throws AMQException;
 
-    ContentBody getContentBodyChunk(long messageId, int index) throws AMQException;
+    ContentBody getContentBodyChunk(Long messageId, int index) throws AMQException;
 
 }

Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java?view=diff&rev=501003&r1=501002&r2=501003
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java Mon Jan 29 02:59:33 2007
@@ -24,7 +24,7 @@
 
 public abstract class AMQBody
 {
-    protected abstract byte getFrameType();
+    public abstract byte getFrameType();
     
     /** 
      * Get the size of the body

Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java?view=diff&rev=501003&r1=501002&r2=501003
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java Mon Jan 29 02:59:33 2007
@@ -24,25 +24,29 @@
 import org.apache.mina.common.ByteBuffer;
 import org.apache.mina.common.IoSession;
 import org.apache.mina.filter.codec.ProtocolDecoderOutput;
+import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
 
 import java.util.HashMap;
 import java.util.Map;
 
 public class AMQDataBlockDecoder
 {
-    Logger _logger = Logger.getLogger(AMQDataBlockDecoder.class);
+    private static final String SESSION_METHOD_BODY_FACTORY = "QPID_SESSION_METHOD_BODY_FACTORY";
 
-    private final Map _supportedBodies = new HashMap();
+    private static final BodyFactory[] _bodiesSupported = new BodyFactory[Byte.MAX_VALUE];
 
-    private final static BodyFactory[] _bodiesSupported = new BodyFactory[Byte.MAX_VALUE];
     static
     {
-        _bodiesSupported[AMQMethodBody.TYPE] = AMQMethodBodyFactory.getInstance();
         _bodiesSupported[ContentHeaderBody.TYPE] = ContentHeaderBodyFactory.getInstance();
         _bodiesSupported[ContentBody.TYPE] = ContentBodyFactory.getInstance();
         _bodiesSupported[HeartbeatBody.TYPE] = new HeartbeatBodyFactory();
     }
 
+
+    Logger _logger = Logger.getLogger(AMQDataBlockDecoder.class);
+
+
+
     public AMQDataBlockDecoder()
     {
     }
@@ -55,52 +59,57 @@
         {
             return false;
         }
-
-        final byte type = in.get();
-        final int channel = in.getUnsignedShort();
+        in.skip(1 + 2);
         final long bodySize = in.getUnsignedInt();
 
-        // bodySize can be zero
-        if (type <= 0 || channel < 0 || bodySize < 0)
-        {
-            throw new AMQFrameDecodingException("Undecodable frame: type = " + type + " channel = " + channel +
-                                                " bodySize = " + bodySize);
-        }
+
 
         return (remainingAfterAttributes >= bodySize);
 
     }
 
-    private boolean isSupportedFrameType(byte frameType)
+
+    protected Object createAndPopulateFrame(IoSession session, ByteBuffer in)
+                    throws AMQFrameDecodingException, AMQProtocolVersionException
     {
-        final boolean result = _bodiesSupported[frameType] != null;
+        final byte type = in.get();
+
+        BodyFactory bodyFactory;
+        if(type == AMQMethodBody.TYPE)
+        {
+            bodyFactory = (BodyFactory) session.getAttribute(SESSION_METHOD_BODY_FACTORY);
+            if(bodyFactory == null)
+            {
+                AMQVersionAwareProtocolSession protocolSession = (AMQVersionAwareProtocolSession) session.getAttachment();
+                bodyFactory = new AMQMethodBodyFactory(protocolSession);
+                session.setAttribute(SESSION_METHOD_BODY_FACTORY, bodyFactory);
+
+            }
 
-        if (!result)
+        }
+        else
         {
-        	_logger.warn("AMQDataBlockDecoder does not handle frame type " + frameType);
+            bodyFactory = _bodiesSupported[type];
         }
 
-        return result;
-    }
 
-    protected Object createAndPopulateFrame(ByteBuffer in)
-                    throws AMQFrameDecodingException, AMQProtocolVersionException
-    {
-        final byte type = in.get();
-        BodyFactory bodyFactory = _bodiesSupported[type];
-        if (!isSupportedFrameType(type))
+
+
+        if(bodyFactory == null)
         {
             throw new AMQFrameDecodingException("Unsupported frame type: " + type);
         }
+
         final int channel = in.getUnsignedShort();
         final long bodySize = in.getUnsignedInt();
 
-        /*
-        if (bodyFactory == null)
+        // bodySize can be zero
+        if (channel < 0 || bodySize < 0)
         {
-            throw new AMQFrameDecodingException("Unsupported body type: " + type);
+            throw new AMQFrameDecodingException("Undecodable frame: type = " + type + " channel = " + channel +
+                                                " bodySize = " + bodySize);
         }
-        */
+
         AMQFrame frame = new AMQFrame(in, channel, bodySize, bodyFactory);
 
         
@@ -115,6 +124,6 @@
     public void decode(IoSession session, ByteBuffer in, ProtocolDecoderOutput out)
         throws Exception
     {
-        out.write(createAndPopulateFrame(in));
+        out.write(createAndPopulateFrame(session, in));
     }
 }

Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockEncoder.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockEncoder.java?view=diff&rev=501003&r1=501002&r2=501003
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockEncoder.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockEncoder.java Mon Jan 29 02:59:33 2007
@@ -28,17 +28,16 @@
 
 import java.util.HashSet;
 import java.util.Set;
+import java.util.Collections;
 
-public class AMQDataBlockEncoder implements MessageEncoder
+public final class AMQDataBlockEncoder implements MessageEncoder
 {
-	Logger _logger = Logger.getLogger(AMQDataBlockEncoder.class);
+	private static final Logger _logger = Logger.getLogger(AMQDataBlockEncoder.class);
 
-    private Set _messageTypes;
+    private final Set _messageTypes = Collections.singleton(EncodableAMQDataBlock.class);
 
     public AMQDataBlockEncoder()
     {
-        _messageTypes = new HashSet();
-        _messageTypes.add(EncodableAMQDataBlock.class);
     }
 
     public void encode(IoSession session, Object message, ProtocolEncoderOutput out) throws Exception

Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java?view=diff&rev=501003&r1=501002&r2=501003
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java Mon Jan 29 02:59:33 2007
@@ -24,59 +24,52 @@
 
 public class AMQFrame extends AMQDataBlock implements EncodableAMQDataBlock
 {
-    public int channel;
+    private final int _channel;
+
+    private final AMQBody _bodyFrame;
 
-    public AMQBody bodyFrame;
 
-    public AMQFrame()
-    {
-    }
 
-    public AMQFrame(int channel, AMQBody bodyFrame)
+    public AMQFrame(final int channel, final AMQBody bodyFrame)
     {
-        this.channel = channel;
-        this.bodyFrame = bodyFrame;
+        _channel = channel;
+        _bodyFrame = bodyFrame;
     }
 
-    public AMQFrame(ByteBuffer in, int channel, long bodySize, BodyFactory bodyFactory) throws AMQFrameDecodingException
+    public AMQFrame(final ByteBuffer in, final int channel, final long bodySize, final BodyFactory bodyFactory) throws AMQFrameDecodingException
     {
-        this.channel = channel;
-        this.bodyFrame = bodyFactory.createBody(in,bodySize);
+        this._channel = channel;
+        this._bodyFrame = bodyFactory.createBody(in,bodySize);
     }
 
     public long getSize()
     {
-        return 1 + 2 + 4 + bodyFrame.getSize() + 1;
+        return 1 + 2 + 4 + _bodyFrame.getSize() + 1;
     }
 
     public void writePayload(ByteBuffer buffer)
     {
-        buffer.put(bodyFrame.getFrameType());
-        // TODO: how does channel get populated
-        EncodingUtils.writeUnsignedShort(buffer, channel);
-        EncodingUtils.writeUnsignedInteger(buffer, bodyFrame.getSize());
-        bodyFrame.writePayload(buffer);
+        buffer.put(_bodyFrame.getFrameType());
+        EncodingUtils.writeUnsignedShort(buffer, _channel);
+        EncodingUtils.writeUnsignedInteger(buffer, _bodyFrame.getSize());
+        _bodyFrame.writePayload(buffer);
         buffer.put((byte) 0xCE);
     }
 
-    /**
-     *
-     * @param buffer
-     * @param channel unsigned short
-     * @param bodySize unsigned integer
-     * @param bodyFactory
-     * @throws AMQFrameDecodingException
-     */
-    public void populateFromBuffer(ByteBuffer buffer, int channel, long bodySize, BodyFactory bodyFactory)
-        throws AMQFrameDecodingException, AMQProtocolVersionException
-    {
-        this.channel = channel;
-        bodyFrame = bodyFactory.createBody(buffer, bodySize);
-      
+    public final int getChannel()
+    {
+        return _channel;
     }
 
+    public final AMQBody getBodyFrame()
+    {
+        return _bodyFrame;
+    }
+
+
+
     public String toString()
     {
-        return "Frame channelId: " + channel + ", bodyFrame: " + String.valueOf(bodyFrame);
+        return "Frame channelId: " + _channel + ", bodyFrame: " + String.valueOf(_bodyFrame);
     }
 }

Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java?view=diff&rev=501003&r1=501002&r2=501003
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java Mon Jan 29 02:59:33 2007
@@ -57,7 +57,7 @@
 
     protected abstract void writeMethodPayload(ByteBuffer buffer);
 
-    protected byte getFrameType()
+    public byte getFrameType()
     {
         return TYPE;
     }

Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyFactory.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyFactory.java?view=diff&rev=501003&r1=501002&r2=501003
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyFactory.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyFactory.java Mon Jan 29 02:59:33 2007
@@ -22,30 +22,21 @@
 
 import org.apache.log4j.Logger;
 import org.apache.mina.common.ByteBuffer;
+import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
 
 public class AMQMethodBodyFactory implements BodyFactory
 {
     private static final Logger _log = Logger.getLogger(AMQMethodBodyFactory.class);
+
+    private final AMQVersionAwareProtocolSession _protocolSession;
     
-    private static final AMQMethodBodyFactory _instance = new AMQMethodBodyFactory();
-    
-    public static AMQMethodBodyFactory getInstance()
-    {
-        return _instance;
-    }
-    
-    private AMQMethodBodyFactory()
+    public AMQMethodBodyFactory(AMQVersionAwareProtocolSession protocolSession)
     {
-        _log.debug("Creating method body factory");
+        _protocolSession = protocolSession;
     }
 
     public AMQBody createBody(ByteBuffer in, long bodySize) throws AMQFrameDecodingException
     {
-        // AMQP version change: MethodBodyDecoderRegistry is obsolete, since all the XML
-        // segments generated together are now handled by MainRegistry. The Cluster class,
-        // if generated together with amqp.xml is a part of MainRegistry.
-        // TODO: Connect with version acquired from ProtocolInitiation class.
-        return MainRegistry.get((short)in.getUnsignedShort(), (short)in.getUnsignedShort(),
-            (byte)8, (byte)0, in, bodySize);
+        return _protocolSession.getRegistry().get((short)in.getUnsignedShort(), (short)in.getUnsignedShort(), in, bodySize);
     }
 }

Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyInstanceFactory.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyInstanceFactory.java?view=diff&rev=501003&r1=501002&r2=501003
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyInstanceFactory.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyInstanceFactory.java Mon Jan 29 02:59:33 2007
@@ -6,4 +6,5 @@
 public abstract interface AMQMethodBodyInstanceFactory
 {
     public AMQMethodBody newInstance(byte major, byte minor, ByteBuffer buffer, long size) throws AMQFrameDecodingException;
+    public AMQMethodBody newInstance(byte major, byte minor, int clazzID, int methodID, ByteBuffer buffer, long size) throws AMQFrameDecodingException;
 }

Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java?view=diff&rev=501003&r1=501002&r2=501003
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java Mon Jan 29 02:59:33 2007
@@ -260,24 +260,14 @@
 
             final AMQShortString otherString = (AMQShortString) o;
 
-            if(otherString.length() != length())
-            {
-                return false;
-            }
             if((_hashCode != 0) && (otherString._hashCode != 0) && (_hashCode != otherString._hashCode))
             {
                 return false;
             }
-            final int size = length();
-            for(int i = 0; i < size; i++)
-            {
-                if(_data.get(i) != otherString._data.get(i))
-                {
-                    return false;
-                }
-            }
 
-            return true;
+            return _data.equals(otherString._data);
+
+            
 
 
         }

Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java?view=diff&rev=501003&r1=501002&r2=501003
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java Mon Jan 29 02:59:33 2007
@@ -49,7 +49,7 @@
         this.payload = payload;
     }
 
-    protected byte getFrameType()
+    public byte getFrameType()
     {
         return TYPE;
     }
@@ -98,9 +98,7 @@
 
     public static AMQFrame createAMQFrame(int channelId, ContentBody body)
     {
-        final AMQFrame frame = new AMQFrame();
-        frame.channel = channelId;
-        frame.bodyFrame = body;
+        final AMQFrame frame = new AMQFrame(channelId, body);
         return frame;
     }
 }

Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java?view=diff&rev=501003&r1=501002&r2=501003
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java Mon Jan 29 02:59:33 2007
@@ -65,7 +65,7 @@
         this.bodySize = bodySize;
     }
 
-    protected byte getFrameType()
+    public byte getFrameType()
     {
         return TYPE;
     }
@@ -113,17 +113,11 @@
     public static AMQFrame createAMQFrame(int channelId, int classId, int weight, BasicContentHeaderProperties properties,
                                           long bodySize)
     {
-        final AMQFrame frame = new AMQFrame();
-        frame.channel = channelId;
-        frame.bodyFrame = new ContentHeaderBody(classId, weight, properties, bodySize);
-        return frame;
+        return new AMQFrame(channelId, new ContentHeaderBody(classId, weight, properties, bodySize));
     }
 
     public static AMQFrame createAMQFrame(int channelId, ContentHeaderBody body)
     {
-        final AMQFrame frame = new AMQFrame();
-        frame.channel = channelId;
-        frame.bodyFrame = body;
-        return frame;
+        return new AMQFrame(channelId, body);
     }
 }

Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java?view=diff&rev=501003&r1=501002&r2=501003
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java Mon Jan 29 02:59:33 2007
@@ -41,7 +41,7 @@
         }
     }
 
-    protected byte getFrameType()
+    public byte getFrameType()
     {
         return TYPE;
     }

Added: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/VersionSpecificRegistry.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/VersionSpecificRegistry.java?view=auto&rev=501003
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/VersionSpecificRegistry.java (added)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/VersionSpecificRegistry.java Mon Jan 29 02:59:33 2007
@@ -0,0 +1,141 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.framing;
+
+import org.apache.mina.common.ByteBuffer;
+import org.apache.log4j.Logger;
+
+public class VersionSpecificRegistry
+{
+    private static final Logger _log = Logger.getLogger(VersionSpecificRegistry.class);
+
+
+    private final byte _protocolMajorVersion;
+    private final byte _protocolMinorVersion;
+
+    private static final int DEFAULT_MAX_CLASS_ID = 200;
+    private static final int DEFAULT_MAX_METHOD_ID = 50;
+
+    private AMQMethodBodyInstanceFactory[][] _registry = new AMQMethodBodyInstanceFactory[DEFAULT_MAX_CLASS_ID][];
+
+    public VersionSpecificRegistry(byte major, byte minor)
+    {
+        _protocolMajorVersion = major;
+        _protocolMinorVersion = minor;
+    }
+
+    public byte getProtocolMajorVersion()
+    {
+        return _protocolMajorVersion;
+    }
+
+    public byte getProtocolMinorVersion()
+    {
+        return _protocolMinorVersion;
+    }
+
+    public AMQMethodBodyInstanceFactory getMethodBody(final short classID, final short methodID)
+    {
+        try
+        {
+            return _registry[classID][methodID];
+        }
+        catch (IndexOutOfBoundsException e)
+        {
+            return null;
+        }
+        catch (NullPointerException e)
+        {
+            return null;
+        }
+    }
+
+    public void registerMethod(final short classID, final short methodID, final AMQMethodBodyInstanceFactory instanceFactory)
+    {
+        if(_registry.length <= classID)
+        {
+            AMQMethodBodyInstanceFactory[][] oldRegistry = _registry;
+            _registry = new AMQMethodBodyInstanceFactory[classID+1][];
+            System.arraycopy(oldRegistry, 0, _registry, 0, oldRegistry.length);
+        }
+
+        if(_registry[classID] == null)
+        {
+            _registry[classID] = new AMQMethodBodyInstanceFactory[methodID > DEFAULT_MAX_METHOD_ID ? methodID + 1 : DEFAULT_MAX_METHOD_ID + 1];
+        }
+        else if(_registry[classID].length <= methodID)
+        {
+            AMQMethodBodyInstanceFactory[] oldMethods = _registry[classID];
+            _registry[classID] = new AMQMethodBodyInstanceFactory[methodID+1];
+            System.arraycopy(oldMethods,0,_registry[classID],0,oldMethods.length);
+        }
+
+        _registry[classID][methodID] = instanceFactory;
+
+    }
+
+
+    public AMQMethodBody get(short classID, short methodID, ByteBuffer in, long size)
+        throws AMQFrameDecodingException
+    {
+        AMQMethodBodyInstanceFactory bodyFactory;
+        try
+        {
+            bodyFactory = _registry[classID][methodID];
+        }
+        catch(NullPointerException e)
+        {
+            throw new AMQFrameDecodingException(_log,
+                "Class " + classID + " unknown in AMQP version " + _protocolMajorVersion + "-" + _protocolMinorVersion
+                 + " (while trying to decode class " + classID + " method " + methodID + ".");
+        }
+        catch(IndexOutOfBoundsException e)
+        {
+            if(classID >= _registry.length)
+            {
+                throw new AMQFrameDecodingException(_log,
+                    "Class " + classID + " unknown in AMQP version " + _protocolMajorVersion + "-" + _protocolMinorVersion
+                     + " (while trying to decode class " + classID + " method " + methodID + ".");
+
+            }
+            else
+            {
+                throw new AMQFrameDecodingException(_log,
+                    "Method " + methodID + " unknown in AMQP version " + _protocolMajorVersion + "-" + _protocolMinorVersion
+                     + " (while trying to decode class " + classID + " method " + methodID + ".");
+
+            }
+        }
+
+
+        if (bodyFactory == null)
+        {
+            throw new AMQFrameDecodingException(_log,
+                "Method " + methodID + " unknown in AMQP version " + _protocolMajorVersion + "-" + _protocolMinorVersion
+                 + " (while trying to decode class " + classID + " method " + methodID + ".");
+        }
+
+
+        return bodyFactory.newInstance(_protocolMajorVersion, _protocolMinorVersion, classID, methodID, in, size);
+
+
+    }
+}

Added: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java?view=auto&rev=501003
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java (added)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java Mon Jan 29 02:59:33 2007
@@ -0,0 +1,29 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.protocol;
+
+import org.apache.qpid.framing.AMQDataBlock;
+import org.apache.qpid.framing.VersionSpecificRegistry;
+
+public interface AMQVersionAwareProtocolSession extends AMQProtocolWriter, ProtocolVersionAware
+{
+    public VersionSpecificRegistry getRegistry();
+}

Added: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolVersionAware.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolVersionAware.java?view=auto&rev=501003
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolVersionAware.java (added)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolVersionAware.java Mon Jan 29 02:59:33 2007
@@ -0,0 +1,28 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.protocol;
+
+public interface ProtocolVersionAware
+{
+    public byte getProtocolMinorVersion();
+
+    public byte getProtocolMajorVersion();
+}

Modified: incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SkeletonMessageStore.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SkeletonMessageStore.java?view=diff&rev=501003&r1=501002&r2=501003
==============================================================================
--- incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SkeletonMessageStore.java (original)
+++ incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SkeletonMessageStore.java Mon Jan 29 02:59:33 2007
@@ -26,7 +26,6 @@
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.queue.MessageMetaData;
-import org.apache.qpid.server.queue.QueueRegistry;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 
 import java.util.List;
@@ -88,7 +87,7 @@
         return _messageId.getAndIncrement();
     }
 
-    public void storeContentBodyChunk(StoreContext sc, long messageId, int index, ContentBody contentBody) throws AMQException
+    public void storeContentBodyChunk(StoreContext sc, long messageId, int index, ContentBody contentBody, boolean lastContentBody) throws AMQException
     {
 
     }